Add common-subexpression support.
diff --git a/expressions/CMakeLists.txt b/expressions/CMakeLists.txt
index b1f1fb1..33606cd 100644
--- a/expressions/CMakeLists.txt
+++ b/expressions/CMakeLists.txt
@@ -25,12 +25,16 @@
                          expressions_Expressions_proto_hdrs
                          Expressions.proto)
 
+add_library(quickstep_expressions_Expression ../empty_src.cpp Expression.hpp)
 add_library(quickstep_expressions_ExpressionFactories
             ExpressionFactories.cpp
             ExpressionFactories.hpp)
 add_library(quickstep_expressions_Expressions_proto
             ${expressions_Expressions_proto_srcs})
 
+target_link_libraries(quickstep_expressions_Expression
+                      quickstep_utility_Macros
+                      quickstep_utility_TreeStringSerializable)
 target_link_libraries(quickstep_expressions_ExpressionFactories
                       glog
                       quickstep_catalog_CatalogDatabaseLite
@@ -48,6 +52,7 @@
                       quickstep_expressions_scalar_ScalarBinaryExpression
                       quickstep_expressions_scalar_ScalarCaseExpression
                       quickstep_expressions_scalar_ScalarLiteral
+                      quickstep_expressions_scalar_ScalarSharedExpression
                       quickstep_expressions_scalar_ScalarUnaryExpression
                       quickstep_types_TypeFactory
                       quickstep_types_TypedValue
@@ -64,6 +69,7 @@
 # Module all-in-one library:
 add_library(quickstep_expressions ../empty_src.cpp ExpressionsModule.hpp)
 target_link_libraries(quickstep_expressions
+                      quickstep_expressions_Expression
                       quickstep_expressions_ExpressionFactories
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_aggregation
diff --git a/expressions/Expression.hpp b/expressions/Expression.hpp
new file mode 100644
index 0000000..39815c3
--- /dev/null
+++ b/expressions/Expression.hpp
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_EXPRESSION_HPP_
+#define QUICKSTEP_EXPRESSIONS_EXPRESSION_HPP_
+
+#include "utility/Macros.hpp"
+#include "utility/TreeStringSerializable.hpp"
+
+namespace quickstep {
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief Base class for all expressions (scalars and predicates) in the backend.
+ */
+class Expression : public TreeStringSerializable<const Expression*> {
+ public:
+  /**
+   * @brief Virtual destructor.
+   **/
+  virtual ~Expression() {}
+
+ protected:
+  Expression() {}
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Expression);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_EXPRESSION_HPP_
diff --git a/expressions/ExpressionFactories.cpp b/expressions/ExpressionFactories.cpp
index 01d22a0..871db50 100644
--- a/expressions/ExpressionFactories.cpp
+++ b/expressions/ExpressionFactories.cpp
@@ -39,6 +39,7 @@
 #include "expressions/scalar/ScalarBinaryExpression.hpp"
 #include "expressions/scalar/ScalarCaseExpression.hpp"
 #include "expressions/scalar/ScalarLiteral.hpp"
+#include "expressions/scalar/ScalarSharedExpression.hpp"
 #include "expressions/scalar/ScalarUnaryExpression.hpp"
 #include "types/TypeFactory.hpp"
 #include "types/TypedValue.hpp"
@@ -210,6 +211,11 @@
                                       std::move(result_expressions),
                                       else_result_expression.release());
     }
+    case serialization::Scalar::SHARED_EXPRESSION: {
+      return new ScalarSharedExpression(
+          proto.GetExtension(serialization::ScalarSharedExpression::share_id),
+          ReconstructFromProto(proto.GetExtension(serialization::ScalarSharedExpression::operand), database));
+    }
     default:
       FATAL_ERROR("Unknown Scalar data source in ScalarFactory::ReconstructFromProto");
   }
@@ -302,6 +308,10 @@
       // Everything checks out.
       return true;
     }
+    case serialization::Scalar::SHARED_EXPRESSION: {
+      return proto.HasExtension(serialization::ScalarSharedExpression::share_id)
+             && proto.HasExtension(serialization::ScalarSharedExpression::operand);
+    }
     default: {
       break;
     }
diff --git a/expressions/Expressions.proto b/expressions/Expressions.proto
index 8d923c5..8b4611e 100644
--- a/expressions/Expressions.proto
+++ b/expressions/Expressions.proto
@@ -50,6 +50,7 @@
     UNARY_EXPRESSION = 2;
     BINARY_EXPRESSION = 3;
     CASE_EXPRESSION = 4;
+    SHARED_EXPRESSION = 5;
   }
 
   required ScalarDataSource data_source = 1;
@@ -123,3 +124,10 @@
     optional Scalar else_result_expression = 163;
   }
 }
+
+message ScalarSharedExpression {
+  extend Scalar {
+    optional int32 share_id = 196;
+    optional Scalar operand = 197;
+  }
+}
diff --git a/expressions/predicate/CMakeLists.txt b/expressions/predicate/CMakeLists.txt
index b90562c..04abfc7 100644
--- a/expressions/predicate/CMakeLists.txt
+++ b/expressions/predicate/CMakeLists.txt
@@ -35,7 +35,7 @@
             ../../empty_src.cpp
             PredicateCost.hpp)
 add_library(quickstep_expressions_predicate_PredicateWithList
-            ../../empty_src.cpp
+            PredicateWithList.cpp
             PredicateWithList.hpp)
 add_library(quickstep_expressions_predicate_TrivialPredicates
             ../../empty_src.cpp
@@ -61,6 +61,7 @@
                       quickstep_types_containers_ColumnVector
                       quickstep_types_operations_Operation_proto
                       quickstep_types_operations_comparisons_Comparison
+                      quickstep_types_operations_comparisons_ComparisonID
                       quickstep_utility_Macros
                       quickstep_utility_PtrVector)
 target_link_libraries(quickstep_expressions_predicate_ConjunctionPredicate
@@ -92,6 +93,7 @@
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_predicate_Predicate
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_Expression
                       quickstep_expressions_Expressions_proto
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_TupleIdSequence
diff --git a/expressions/predicate/ComparisonPredicate.cpp b/expressions/predicate/ComparisonPredicate.cpp
index 5f8612e..2f7b84b 100644
--- a/expressions/predicate/ComparisonPredicate.cpp
+++ b/expressions/predicate/ComparisonPredicate.cpp
@@ -23,6 +23,7 @@
 #include <limits>
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -41,6 +42,7 @@
 #include "types/containers/ColumnVector.hpp"
 #include "types/operations/Operation.pb.h"
 #include "types/operations/comparisons/Comparison.hpp"
+#include "types/operations/comparisons/ComparisonID.hpp"
 #include "utility/Macros.hpp"
 #include "utility/PtrVector.hpp"
 
@@ -190,18 +192,20 @@
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
     if (short_circuit_adapter) {
-      std::unique_ptr<ColumnVector> right_values(right_operand_->getAllValues(
+      ColumnVectorPtr right_values(right_operand_->getAllValues(
           short_circuit_adapter.get(),
-          sub_blocks_ref));
+          sub_blocks_ref,
+          nullptr /* cv_cache */));
       return fast_comparator_->compareStaticValueAndColumnVector(
           left_operand_->getStaticValue(),
           *right_values,
           nullptr,
           filter);
     } else {
-      std::unique_ptr<ColumnVector> right_values(right_operand_->getAllValues(
+      ColumnVectorPtr right_values(right_operand_->getAllValues(
           accessor,
-          sub_blocks_ref));
+          sub_blocks_ref,
+          nullptr /* cv_cache */));
       return fast_comparator_->compareStaticValueAndColumnVector(
           left_operand_->getStaticValue(),
           *right_values,
@@ -222,18 +226,20 @@
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
     if (short_circuit_adapter) {
-      std::unique_ptr<ColumnVector> left_values(left_operand_->getAllValues(
+      ColumnVectorPtr left_values(left_operand_->getAllValues(
           short_circuit_adapter.get(),
-          sub_blocks_ref));
+          sub_blocks_ref,
+          nullptr /* cv_cache */));
       return fast_comparator_->compareColumnVectorAndStaticValue(
           *left_values,
           right_operand_->getStaticValue(),
           nullptr,
           filter);
     } else {
-      std::unique_ptr<ColumnVector> left_values(left_operand_->getAllValues(
+      ColumnVectorPtr left_values(left_operand_->getAllValues(
           accessor,
-          sub_blocks_ref));
+          sub_blocks_ref,
+          nullptr /* cv_cache */));
       return fast_comparator_->compareColumnVectorAndStaticValue(
           *left_values,
           right_operand_->getStaticValue(),
@@ -255,9 +261,10 @@
                                                             filter);
       } else {
         if (short_circuit_adapter) {
-          std::unique_ptr<ColumnVector> right_values(right_operand_->getAllValues(
+          ColumnVectorPtr right_values(right_operand_->getAllValues(
               short_circuit_adapter.get(),
-              sub_blocks_ref));
+              sub_blocks_ref,
+              nullptr /* cv_cache */));
           return fast_comparator_->compareValueAccessorAndColumnVector(
               short_circuit_adapter.get(),
               left_operand_attr_id,
@@ -265,9 +272,10 @@
               nullptr,
               filter);
         } else {
-          std::unique_ptr<ColumnVector> right_values(right_operand_->getAllValues(
+          ColumnVectorPtr right_values(right_operand_->getAllValues(
               accessor,
-              sub_blocks_ref));
+              sub_blocks_ref,
+              nullptr /* cv_cache */));
           return fast_comparator_->compareValueAccessorAndColumnVector(accessor,
                                                                        left_operand_attr_id,
                                                                        *right_values,
@@ -277,9 +285,10 @@
       }
     } else if (right_operand_attr_id != -1) {
       if (short_circuit_adapter) {
-        std::unique_ptr<ColumnVector> left_values(left_operand_->getAllValues(
+        ColumnVectorPtr left_values(left_operand_->getAllValues(
             short_circuit_adapter.get(),
-            sub_blocks_ref));
+            sub_blocks_ref,
+            nullptr /* cv_cache */));
         return fast_comparator_->compareColumnVectorAndValueAccessor(
             *left_values,
             short_circuit_adapter.get(),
@@ -287,9 +296,10 @@
             nullptr,
             filter);
       } else {
-        std::unique_ptr<ColumnVector> left_values(left_operand_->getAllValues(
+        ColumnVectorPtr left_values(left_operand_->getAllValues(
             accessor,
-            sub_blocks_ref));
+            sub_blocks_ref,
+            nullptr /* cv_cache */));
         return fast_comparator_->compareColumnVectorAndValueAccessor(*left_values,
                                                                      accessor,
                                                                      right_operand_attr_id,
@@ -300,23 +310,27 @@
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
     if (short_circuit_adapter) {
-      std::unique_ptr<ColumnVector> left_values(left_operand_->getAllValues(
+      ColumnVectorPtr left_values(left_operand_->getAllValues(
           short_circuit_adapter.get(),
-          sub_blocks_ref));
-      std::unique_ptr<ColumnVector> right_values(right_operand_->getAllValues(
+          sub_blocks_ref,
+          nullptr /* cv_cache */));
+      ColumnVectorPtr right_values(right_operand_->getAllValues(
           short_circuit_adapter.get(),
-          sub_blocks_ref));
+          sub_blocks_ref,
+          nullptr /* cv_cache */));
       return fast_comparator_->compareColumnVectors(*left_values,
                                                     *right_values,
                                                     nullptr,
                                                     filter);
     } else {
-      std::unique_ptr<ColumnVector> left_values(left_operand_->getAllValues(
+      ColumnVectorPtr left_values(left_operand_->getAllValues(
           accessor,
-          sub_blocks_ref));
-      std::unique_ptr<ColumnVector> right_values(right_operand_->getAllValues(
+          sub_blocks_ref,
+          nullptr /* cv_cache */));
+      ColumnVectorPtr right_values(right_operand_->getAllValues(
           accessor,
-          sub_blocks_ref));
+          sub_blocks_ref,
+          nullptr /* cv_cache */));
       return fast_comparator_->compareColumnVectors(*left_values,
                                                     *right_values,
                                                     filter,
@@ -373,4 +387,29 @@
   }
 }
 
+void ComparisonPredicate::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<const Expression*> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<const Expression*>> *container_child_fields) const {
+  Predicate::getFieldStringItems(inline_field_names,
+                                 inline_field_values,
+                                 non_container_child_field_names,
+                                 non_container_child_fields,
+                                 container_child_field_names,
+                                 container_child_fields);
+
+  inline_field_names->emplace_back("comparison");
+  inline_field_values->emplace_back(
+      kComparisonNames[static_cast<std::underlying_type<ComparisonID>::type>(
+          comparison_.getComparisonID())]);
+
+  non_container_child_field_names->emplace_back("left_operand");
+  non_container_child_fields->emplace_back(left_operand_.get());
+  non_container_child_field_names->emplace_back("right_operand");
+  non_container_child_fields->emplace_back(right_operand_.get());
+}
+
 }  // namespace quickstep
diff --git a/expressions/predicate/ComparisonPredicate.hpp b/expressions/predicate/ComparisonPredicate.hpp
index 9030857..1ef2cb1 100644
--- a/expressions/predicate/ComparisonPredicate.hpp
+++ b/expressions/predicate/ComparisonPredicate.hpp
@@ -21,7 +21,9 @@
 #define QUICKSTEP_EXPRESSIONS_PREDICATE_COMPARISON_PREDICATE_HPP_
 
 #include <memory>
+#include <string>
 #include <utility>
+#include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/Expressions.pb.h"
@@ -35,6 +37,7 @@
 
 namespace quickstep {
 
+class Expression;
 class TupleIdSequence;
 class ValueAccessor;
 
@@ -137,6 +140,15 @@
    **/
   std::pair<bool, attribute_id> getAttributeFromAttributeLiteralComparison() const;
 
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<const Expression*> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<const Expression*>> *container_child_fields) const override;
+
  private:
   const Comparison &comparison_;
   std::unique_ptr<Scalar> left_operand_;
diff --git a/expressions/predicate/NegationPredicate.cpp b/expressions/predicate/NegationPredicate.cpp
index bee1c8d..92a8411 100644
--- a/expressions/predicate/NegationPredicate.cpp
+++ b/expressions/predicate/NegationPredicate.cpp
@@ -19,6 +19,9 @@
 
 #include "expressions/predicate/NegationPredicate.hpp"
 
+#include <string>
+#include <vector>
+
 #include "expressions/Expressions.pb.h"
 #include "expressions/predicate/Predicate.hpp"
 #include "storage/TupleIdSequence.hpp"
@@ -120,4 +123,22 @@
   }
 }
 
+void NegationPredicate::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<const Expression*> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<const Expression*>> *container_child_fields) const {
+  Predicate::getFieldStringItems(inline_field_names,
+                                 inline_field_values,
+                                 non_container_child_field_names,
+                                 non_container_child_fields,
+                                 container_child_field_names,
+                                 container_child_fields);
+
+  non_container_child_field_names->emplace_back("operand");
+  non_container_child_fields->emplace_back(operand_.get());
+}
+
 }  // namespace quickstep
diff --git a/expressions/predicate/NegationPredicate.hpp b/expressions/predicate/NegationPredicate.hpp
index 33c6df8..ec005ea 100644
--- a/expressions/predicate/NegationPredicate.hpp
+++ b/expressions/predicate/NegationPredicate.hpp
@@ -21,6 +21,8 @@
 #define QUICKSTEP_EXPRESSIONS_PREDICATE_NEGATION_PREDICATE_HPP_
 
 #include <memory>
+#include <string>
+#include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/Expressions.pb.h"
@@ -30,6 +32,7 @@
 
 namespace quickstep {
 
+class Expression;
 class TupleIdSequence;
 class ValueAccessor;
 
@@ -105,6 +108,15 @@
 
   bool getStaticResult() const override;
 
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<const Expression*> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<const Expression*>> *container_child_fields) const override;
+
  private:
   void initHelper();
 
diff --git a/expressions/predicate/Predicate.cpp b/expressions/predicate/Predicate.cpp
index 006e8f1..1f61d62 100644
--- a/expressions/predicate/Predicate.cpp
+++ b/expressions/predicate/Predicate.cpp
@@ -19,6 +19,9 @@
 
 #include "expressions/predicate/Predicate.hpp"
 
+#include <string>
+#include <vector>
+
 #include "storage/TupleIdSequence.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "utility/Macros.hpp"
@@ -57,4 +60,17 @@
   return result;
 }
 
+void Predicate::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<const Expression*> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<const Expression*>> *container_child_fields) const {
+  if (hasStaticResult()) {
+    inline_field_names->emplace_back("static_result");
+    inline_field_values->emplace_back(getStaticResult() ? "true" : "false");
+  }
+}
+
 }  // namespace quickstep
diff --git a/expressions/predicate/Predicate.hpp b/expressions/predicate/Predicate.hpp
index 5fb3ef5..6a2ba6d 100644
--- a/expressions/predicate/Predicate.hpp
+++ b/expressions/predicate/Predicate.hpp
@@ -20,7 +20,12 @@
 #ifndef QUICKSTEP_EXPRESSIONS_PREDICATE_PREDICATE_HPP_
 #define QUICKSTEP_EXPRESSIONS_PREDICATE_PREDICATE_HPP_
 
+#include <string>
+#include <type_traits>
+#include <vector>
+
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/Expression.hpp"
 #include "expressions/Expressions.pb.h"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
@@ -39,7 +44,7 @@
 /**
  * @brief Base class for all predicates.
  **/
-class Predicate {
+class Predicate : public Expression {
  public:
   /**
    * @brief The possible types of predicates.
@@ -67,6 +72,11 @@
   virtual ~Predicate() {
   }
 
+  std::string getName() const override {
+    return kPredicateTypeNames[
+        static_cast<std::underlying_type<PredicateType>::type>(getPredicateType())];
+  }
+
   /**
    * @brief Serialize this predicate in Protocol Buffer form.
    *
@@ -189,6 +199,14 @@
   virtual bool getStaticResult() const;
 
  protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<const Expression*> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<const Expression*>> *container_child_fields) const override;
+
   Predicate() {
   }
 
diff --git a/expressions/predicate/PredicateWithList.cpp b/expressions/predicate/PredicateWithList.cpp
new file mode 100644
index 0000000..6b3d7a7
--- /dev/null
+++ b/expressions/predicate/PredicateWithList.cpp
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "expressions/predicate/PredicateWithList.hpp"
+
+#include <string>
+#include <vector>
+
+namespace quickstep {
+
+void PredicateWithList::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<const Expression*> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<const Expression*>> *container_child_fields) const {
+  Predicate::getFieldStringItems(inline_field_names,
+                                 inline_field_values,
+                                 non_container_child_field_names,
+                                 non_container_child_fields,
+                                 container_child_field_names,
+                                 container_child_fields);
+
+  container_child_field_names->emplace_back("static_operand_list");
+  container_child_fields->emplace_back();
+  for (const auto &static_operand : static_operand_list_) {
+    container_child_fields->back().emplace_back(&static_operand);
+  }
+
+  container_child_field_names->emplace_back("dynamic_operand_list");
+  container_child_fields->emplace_back();
+  for (const auto &dynamic_operand : dynamic_operand_list_) {
+    container_child_fields->back().emplace_back(&dynamic_operand);
+  }
+}
+
+}  // namespace quickstep
diff --git a/expressions/predicate/PredicateWithList.hpp b/expressions/predicate/PredicateWithList.hpp
index b1bf7e5..c6fb99d 100644
--- a/expressions/predicate/PredicateWithList.hpp
+++ b/expressions/predicate/PredicateWithList.hpp
@@ -20,6 +20,9 @@
 #ifndef QUICKSTEP_EXPRESSIONS_PREDICATE_PREDICATE_WITH_LIST_HPP_
 #define QUICKSTEP_EXPRESSIONS_PREDICATE_PREDICATE_WITH_LIST_HPP_
 
+#include <string>
+#include <vector>
+
 #include "expressions/predicate/Predicate.hpp"
 #include "utility/Macros.hpp"
 #include "utility/PtrList.hpp"
@@ -58,6 +61,14 @@
   }
 
  protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<const Expression*> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<const Expression*>> *container_child_fields) const override;
+
   PtrList<Predicate> static_operand_list_;
   PtrList<Predicate> dynamic_operand_list_;
 
diff --git a/expressions/scalar/CMakeLists.txt b/expressions/scalar/CMakeLists.txt
index 8f509da..6b52231 100644
--- a/expressions/scalar/CMakeLists.txt
+++ b/expressions/scalar/CMakeLists.txt
@@ -29,6 +29,9 @@
 add_library(quickstep_expressions_scalar_ScalarLiteral
             ScalarLiteral.cpp
             ScalarLiteral.hpp)
+add_library(quickstep_expressions_scalar_ScalarSharedExpression
+            ScalarSharedExpression.cpp
+            ScalarSharedExpression.hpp)
 add_library(quickstep_expressions_scalar_ScalarUnaryExpression
             ScalarUnaryExpression.cpp
             ScalarUnaryExpression.hpp)
@@ -36,9 +39,11 @@
 # Link dependencies:
 target_link_libraries(quickstep_expressions_scalar_Scalar
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_Expression
                       quickstep_expressions_Expressions_proto
                       quickstep_storage_StorageBlockInfo
                       quickstep_types_TypedValue
+                      quickstep_types_containers_ColumnVector
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_scalar_ScalarAttribute
                       quickstep_catalog_CatalogAttribute
@@ -65,6 +70,7 @@
                       quickstep_types_containers_ColumnVector
                       quickstep_types_operations_Operation_proto
                       quickstep_types_operations_binaryoperations_BinaryOperation
+                      quickstep_types_operations_binaryoperations_BinaryOperationID
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_scalar_ScalarCaseExpression
                       quickstep_catalog_CatalogTypedefs
@@ -92,6 +98,16 @@
                       quickstep_types_TypedValue_proto
                       quickstep_types_containers_ColumnVector
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_expressions_scalar_ScalarSharedExpression
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_expressions_Expressions_proto
+                      quickstep_expressions_scalar_Scalar
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_storage_ValueAccessor
+                      quickstep_types_TypedValue
+                      quickstep_types_containers_ColumnVector
+                      quickstep_utility_ColumnVectorCache
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_expressions_scalar_ScalarUnaryExpression
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_Expressions_proto
@@ -104,6 +120,7 @@
                       quickstep_types_containers_ColumnVector
                       quickstep_types_operations_Operation_proto
                       quickstep_types_operations_unaryoperations_UnaryOperation
+                      quickstep_types_operations_unaryoperations_UnaryOperationID
                       quickstep_utility_Macros)
 
 # Submodule all-in-one library:
@@ -114,6 +131,7 @@
                       quickstep_expressions_scalar_ScalarBinaryExpression
                       quickstep_expressions_scalar_ScalarCaseExpression
                       quickstep_expressions_scalar_ScalarLiteral
+                      quickstep_expressions_scalar_ScalarSharedExpression
                       quickstep_expressions_scalar_ScalarUnaryExpression)
 
 # Tests:
diff --git a/expressions/scalar/Scalar.cpp b/expressions/scalar/Scalar.cpp
index a1c436c..c552d8b 100644
--- a/expressions/scalar/Scalar.cpp
+++ b/expressions/scalar/Scalar.cpp
@@ -19,6 +19,9 @@
 
 #include "expressions/scalar/Scalar.hpp"
 
+#include <string>
+#include <vector>
+
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -28,6 +31,7 @@
   "Attribute",
   "UnaryExpression",
   "BinaryExpression",
+  "SharedExpression",
   "SimpleCase"
 };
 
@@ -35,4 +39,15 @@
   FATAL_ERROR("Called getStaticValue() on a Scalar which does not have a static value");
 }
 
+void Scalar::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<const Expression*> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<const Expression*>> *container_child_fields) const {
+  inline_field_names->emplace_back("result_type");
+  inline_field_values->emplace_back(type_.getName());
+}
+
 }  // namespace quickstep
diff --git a/expressions/scalar/Scalar.hpp b/expressions/scalar/Scalar.hpp
index 2db850a..472b71c 100644
--- a/expressions/scalar/Scalar.hpp
+++ b/expressions/scalar/Scalar.hpp
@@ -20,18 +20,22 @@
 #ifndef QUICKSTEP_EXPRESSIONS_SCALAR_SCALAR_HPP_
 #define QUICKSTEP_EXPRESSIONS_SCALAR_SCALAR_HPP_
 
+#include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
 #include "catalog/CatalogTypedefs.hpp"
+#include "expressions/Expression.hpp"
 #include "expressions/Expressions.pb.h"
 #include "storage/StorageBlockInfo.hpp"
 #include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
-class ColumnVector;
+class ColumnVectorCache;
 class Type;
 class ValueAccessor;
 
@@ -44,7 +48,7 @@
 /**
  * @brief Base class for anything which evaluates to a Scalar value.
  **/
-class Scalar {
+class Scalar : public Expression {
  public:
   /**
    * @brief The possible provenance of Scalar values.
@@ -55,6 +59,7 @@
     kUnaryExpression,
     kBinaryExpression,
     kCaseExpression,
+    kSharedExpression,
     kNumScalarDataSources  // Not a real ScalarDataSource, exists for counting purposes.
   };
 
@@ -70,6 +75,11 @@
   virtual ~Scalar() {
   }
 
+  std::string getName() const override {
+    return kScalarDataSourceNames[
+        static_cast<std::underlying_type<ScalarDataSource>::type>(getDataSource())];
+  }
+
   /**
    * @brief Serialize this scalar in Protocol Buffer form.
    *
@@ -197,11 +207,14 @@
    *        fast-path (non-scan) evaluation of Predicates embedded in this
    *        scalar (e.g. WHEN predicates in ScalarCaseExpression). May be NULL,
    *        in which case scan-based evaluation is always used.
+   * @param cv_cache If non-NULL, used as memoization table that is updated and
+   *        looked up during evaluation, for results of common subexpressions.
    * @return A ColumnVector of this Scalar's values for each tuple accesible
    *         via accessor.
    **/
-  virtual ColumnVector* getAllValues(ValueAccessor *accessor,
-                                     const SubBlocksReference *sub_blocks_ref) const = 0;
+  virtual ColumnVectorPtr getAllValues(ValueAccessor *accessor,
+                                       const SubBlocksReference *sub_blocks_ref,
+                                       ColumnVectorCache *cv_cache) const = 0;
 
   /**
    * @brief Get this Scalar's value for all specified joined tuples from two
@@ -215,19 +228,30 @@
    *        from the right relation.
    * @param joined_tuple_ids A series of pairs of tuple ids from the left and
    *        right relations that will be joined.
+   * @param cv_cache If non-NULL, used as memoization table that is updated and
+   *        looked up during evaluation, for results of common subexpressions.
    * @return A ColumnVector of this Scalar's values for all the joined tuples
    *         specified by joined_tuple_ids.
    **/
-  virtual ColumnVector* getAllValuesForJoin(
+  virtual ColumnVectorPtr getAllValuesForJoin(
       const relation_id left_relation_id,
       ValueAccessor *left_accessor,
       const relation_id right_relation_id,
       ValueAccessor *right_accessor,
-      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids) const = 0;
+      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+      ColumnVectorCache *cv_cache) const = 0;
 
  protected:
-  explicit Scalar(const Type &type) : type_(type) {
-  }
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<const Expression*> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<const Expression*>> *container_child_fields) const override;
+
+  explicit Scalar(const Type &type)
+      : Expression(), type_(type) {}
 
   const Type &type_;
 
diff --git a/expressions/scalar/ScalarAttribute.cpp b/expressions/scalar/ScalarAttribute.cpp
index cc42084..036eeba 100644
--- a/expressions/scalar/ScalarAttribute.cpp
+++ b/expressions/scalar/ScalarAttribute.cpp
@@ -19,6 +19,7 @@
 
 #include "expressions/scalar/ScalarAttribute.hpp"
 
+#include <string>
 #include <utility>
 #include <vector>
 
@@ -88,13 +89,15 @@
   return attribute_.getParent().getID();
 }
 
-ColumnVector* ScalarAttribute::getAllValues(ValueAccessor *accessor,
-                                            const SubBlocksReference *sub_blocks_ref) const {
+ColumnVectorPtr ScalarAttribute::getAllValues(
+    ValueAccessor *accessor,
+    const SubBlocksReference *sub_blocks_ref,
+    ColumnVectorCache *cv_cache) const {
   const attribute_id attr_id = attribute_.getID();
   const Type &result_type = attribute_.getType();
   return InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
       accessor,
-      [&attr_id, &result_type](auto *accessor) -> ColumnVector* {  // NOLINT(build/c++11)
+      [&attr_id, &result_type](auto *accessor) -> ColumnVectorPtr {  // NOLINT(build/c++11)
     if (NativeColumnVector::UsableForType(result_type)) {
       NativeColumnVector *result = new NativeColumnVector(result_type,
                                                           accessor->getNumTuples());
@@ -139,7 +142,7 @@
           }
         }
       }
-      return result;
+      return ColumnVectorPtr(result);
     } else {
       IndirectColumnVector *result = new IndirectColumnVector(result_type,
                                                               accessor->getNumTuples());
@@ -147,17 +150,18 @@
       while (accessor->next()) {
         result->appendTypedValue(accessor->getTypedValue(attr_id));
       }
-      return result;
+      return ColumnVectorPtr(result);
     }
   });
 }
 
-ColumnVector* ScalarAttribute::getAllValuesForJoin(
+ColumnVectorPtr ScalarAttribute::getAllValuesForJoin(
     const relation_id left_relation_id,
     ValueAccessor *left_accessor,
     const relation_id right_relation_id,
     ValueAccessor *right_accessor,
-    const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids) const {
+    const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+    ColumnVectorCache *cv_cache) const {
   DCHECK((attribute_.getParent().getID() == left_relation_id)
          || (attribute_.getParent().getID() == right_relation_id));
 
@@ -173,7 +177,7 @@
       [&joined_tuple_ids,
        &attr_id,
        &result_type,
-       &using_left_relation](auto *accessor) -> ColumnVector* {  // NOLINT(build/c++11)
+       &using_left_relation](auto *accessor) -> ColumnVectorPtr {  // NOLINT(build/c++11)
     if (NativeColumnVector::UsableForType(result_type)) {
       NativeColumnVector *result = new NativeColumnVector(result_type,
                                                           joined_tuple_ids.size());
@@ -196,7 +200,7 @@
                   using_left_relation ? joined_pair.first : joined_pair.second));
         }
       }
-      return result;
+      return ColumnVectorPtr(result);
     } else {
       IndirectColumnVector *result = new IndirectColumnVector(result_type,
                                                               joined_tuple_ids.size());
@@ -206,9 +210,27 @@
                   attr_id,
                   using_left_relation ? joined_pair.first : joined_pair.second));
       }
-      return result;
+      return ColumnVectorPtr(result);
     }
   });
 }
 
+void ScalarAttribute::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<const Expression*> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<const Expression*>> *container_child_fields) const {
+  Scalar::getFieldStringItems(inline_field_names,
+                              inline_field_values,
+                              non_container_child_field_names,
+                              non_container_child_fields,
+                              container_child_field_names,
+                              container_child_fields);
+
+  inline_field_names->emplace_back("attribute");
+  inline_field_values->emplace_back(std::to_string(attribute_.getID()));
+}
+
 }  // namespace quickstep
diff --git a/expressions/scalar/ScalarAttribute.hpp b/expressions/scalar/ScalarAttribute.hpp
index c6a41df..4d30fe9 100644
--- a/expressions/scalar/ScalarAttribute.hpp
+++ b/expressions/scalar/ScalarAttribute.hpp
@@ -20,6 +20,7 @@
 #ifndef QUICKSTEP_EXPRESSIONS_SCALAR_SCALAR_ATTRIBUTE_HPP_
 #define QUICKSTEP_EXPRESSIONS_SCALAR_SCALAR_ATTRIBUTE_HPP_
 
+#include <string>
 #include <utility>
 #include <vector>
 
@@ -28,12 +29,13 @@
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
 class CatalogAttribute;
-class ColumnVector;
+class ColumnVectorCache;
 class ValueAccessor;
 
 struct SubBlocksReference;
@@ -77,21 +79,31 @@
 
   relation_id getRelationIdForValueAccessor() const override;
 
-  ColumnVector* getAllValues(ValueAccessor *accessor,
-                             const SubBlocksReference *sub_blocks_ref) const override;
+  ColumnVectorPtr getAllValues(ValueAccessor *accessor,
+                               const SubBlocksReference *sub_blocks_ref,
+                               ColumnVectorCache *cv_cache) const override;
 
-  ColumnVector* getAllValuesForJoin(
+  ColumnVectorPtr getAllValuesForJoin(
       const relation_id left_relation_id,
       ValueAccessor *left_accessor,
       const relation_id right_relation_id,
       ValueAccessor *right_accessor,
-      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids) const override;
+      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+      ColumnVectorCache *cv_cache) const override;
 
   const CatalogAttribute& getAttribute() const {
     return attribute_;
   }
 
  protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<const Expression*> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<const Expression*>> *container_child_fields) const override;
+
   const CatalogAttribute &attribute_;
 
  private:
diff --git a/expressions/scalar/ScalarBinaryExpression.cpp b/expressions/scalar/ScalarBinaryExpression.cpp
index 5fe6cde..b3568f8 100644
--- a/expressions/scalar/ScalarBinaryExpression.cpp
+++ b/expressions/scalar/ScalarBinaryExpression.cpp
@@ -21,6 +21,7 @@
 
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -33,6 +34,7 @@
 #include "types/containers/ColumnVector.hpp"
 #include "types/operations/Operation.pb.h"
 #include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
 
 #include "glog/logging.h"
 
@@ -101,13 +103,15 @@
   }
 }
 
-ColumnVector* ScalarBinaryExpression::getAllValues(
+ColumnVectorPtr ScalarBinaryExpression::getAllValues(
     ValueAccessor *accessor,
-    const SubBlocksReference *sub_blocks_ref) const {
+    const SubBlocksReference *sub_blocks_ref,
+    ColumnVectorCache *cv_cache) const {
   if (fast_operator_.get() == nullptr) {
-    return ColumnVector::MakeVectorOfValue(getType(),
-                                           static_value_,
-                                           accessor->getNumTuplesVirtual());
+    return ColumnVectorPtr(
+        ColumnVector::MakeVectorOfValue(getType(),
+                                        static_value_,
+                                        accessor->getNumTuplesVirtual()));
   } else {
     // NOTE(chasseur): We don't check if BOTH operands have a static value,
     // because if they did then this expression would also have a static value
@@ -117,35 +121,39 @@
       const attribute_id right_operand_attr_id
           = right_operand_->getAttributeIdForValueAccessor();
       if (right_operand_attr_id != -1) {
-        return fast_operator_->applyToStaticValueAndValueAccessor(
-            left_operand_->getStaticValue(),
-            accessor,
-            right_operand_attr_id);
+        return ColumnVectorPtr(
+            fast_operator_->applyToStaticValueAndValueAccessor(
+                left_operand_->getStaticValue(),
+                accessor,
+                right_operand_attr_id));
       }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
-      std::unique_ptr<ColumnVector> right_result(
-          right_operand_->getAllValues(accessor, sub_blocks_ref));
-      return fast_operator_->applyToStaticValueAndColumnVector(
-          left_operand_->getStaticValue(),
-          *right_result);
+      ColumnVectorPtr right_result(
+          right_operand_->getAllValues(accessor, sub_blocks_ref, cv_cache));
+      return ColumnVectorPtr(
+          fast_operator_->applyToStaticValueAndColumnVector(
+              left_operand_->getStaticValue(),
+              *right_result));
     } else if (right_operand_->hasStaticValue()) {
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
       const attribute_id left_operand_attr_id
           = left_operand_->getAttributeIdForValueAccessor();
       if (left_operand_attr_id != -1) {
-        return fast_operator_->applyToValueAccessorAndStaticValue(
-            accessor,
-            left_operand_attr_id,
-            right_operand_->getStaticValue());
+        return ColumnVectorPtr(
+            fast_operator_->applyToValueAccessorAndStaticValue(
+                accessor,
+                left_operand_attr_id,
+                right_operand_->getStaticValue()));
       }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
-      std::unique_ptr<ColumnVector> left_result(
-          left_operand_->getAllValues(accessor, sub_blocks_ref));
-      return fast_operator_->applyToColumnVectorAndStaticValue(
-          *left_result,
-          right_operand_->getStaticValue());
+      ColumnVectorPtr left_result(
+          left_operand_->getAllValues(accessor, sub_blocks_ref, cv_cache));
+      return ColumnVectorPtr(
+          fast_operator_->applyToColumnVectorAndStaticValue(
+              *left_result,
+              right_operand_->getStaticValue()));
     } else {
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
       const attribute_id left_operand_attr_id
@@ -155,44 +163,53 @@
 
       if (left_operand_attr_id != -1) {
         if (right_operand_attr_id != -1) {
-          return fast_operator_->applyToSingleValueAccessor(accessor,
-                                                            left_operand_attr_id,
-                                                            right_operand_attr_id);
+          return ColumnVectorPtr(
+              fast_operator_->applyToSingleValueAccessor(
+                  accessor,
+                  left_operand_attr_id,
+                  right_operand_attr_id));
         } else {
-          std::unique_ptr<ColumnVector> right_result(
-              right_operand_->getAllValues(accessor, sub_blocks_ref));
-          return fast_operator_->applyToValueAccessorAndColumnVector(accessor,
-                                                                     left_operand_attr_id,
-                                                                     *right_result);
+          ColumnVectorPtr right_result(
+              right_operand_->getAllValues(accessor, sub_blocks_ref, cv_cache));
+          return ColumnVectorPtr(
+              fast_operator_->applyToValueAccessorAndColumnVector(
+                  accessor,
+                  left_operand_attr_id,
+                  *right_result));
         }
       } else if (right_operand_attr_id != -1) {
-        std::unique_ptr<ColumnVector> left_result(
-            left_operand_->getAllValues(accessor, sub_blocks_ref));
-        return fast_operator_->applyToColumnVectorAndValueAccessor(*left_result,
-                                                                   accessor,
-                                                                   right_operand_attr_id);
+        ColumnVectorPtr left_result(
+            left_operand_->getAllValues(accessor, sub_blocks_ref, cv_cache));
+        return ColumnVectorPtr(
+            fast_operator_->applyToColumnVectorAndValueAccessor(
+                *left_result,
+                accessor,
+                right_operand_attr_id));
       }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
-      std::unique_ptr<ColumnVector> left_result(
-          left_operand_->getAllValues(accessor, sub_blocks_ref));
-      std::unique_ptr<ColumnVector> right_result(
-          right_operand_->getAllValues(accessor, sub_blocks_ref));
-      return fast_operator_->applyToColumnVectors(*left_result, *right_result);
+      ColumnVectorPtr left_result(
+          left_operand_->getAllValues(accessor, sub_blocks_ref, cv_cache));
+      ColumnVectorPtr right_result(
+          right_operand_->getAllValues(accessor, sub_blocks_ref, cv_cache));
+      return ColumnVectorPtr(
+          fast_operator_->applyToColumnVectors(*left_result, *right_result));
     }
   }
 }
 
-ColumnVector* ScalarBinaryExpression::getAllValuesForJoin(
+ColumnVectorPtr ScalarBinaryExpression::getAllValuesForJoin(
     const relation_id left_relation_id,
     ValueAccessor *left_accessor,
     const relation_id right_relation_id,
     ValueAccessor *right_accessor,
-    const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids) const {
+    const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+    ColumnVectorCache *cv_cache) const {
   if (fast_operator_.get() == nullptr) {
-    return ColumnVector::MakeVectorOfValue(getType(),
-                                           static_value_,
-                                           joined_tuple_ids.size());
+    return ColumnVectorPtr(
+        ColumnVector::MakeVectorOfValue(getType(),
+                                        static_value_,
+                                        joined_tuple_ids.size()));
   } else {
     if (left_operand_->hasStaticValue()) {
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_JOIN
@@ -207,24 +224,27 @@
         const bool using_left_relation = (right_operand_relation_id == left_relation_id);
         ValueAccessor *right_operand_accessor = using_left_relation ? left_accessor
                                                                     : right_accessor;
-        return fast_operator_->applyToStaticValueAndValueAccessorForJoin(
-            left_operand_->getStaticValue(),
-            right_operand_accessor,
-            using_left_relation,
-            right_operand_attr_id,
-            joined_tuple_ids);
+        return ColumnVectorPtr(
+            fast_operator_->applyToStaticValueAndValueAccessorForJoin(
+                left_operand_->getStaticValue(),
+                right_operand_accessor,
+                using_left_relation,
+                right_operand_attr_id,
+                joined_tuple_ids));
       }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_JOIN
 
-      std::unique_ptr<ColumnVector> right_result(
+      ColumnVectorPtr right_result(
           right_operand_->getAllValuesForJoin(left_relation_id,
                                               left_accessor,
                                               right_relation_id,
                                               right_accessor,
-                                              joined_tuple_ids));
-      return fast_operator_->applyToStaticValueAndColumnVector(
-          left_operand_->getStaticValue(),
-          *right_result);
+                                              joined_tuple_ids,
+                                              cv_cache));
+      return ColumnVectorPtr(
+          fast_operator_->applyToStaticValueAndColumnVector(
+              left_operand_->getStaticValue(),
+              *right_result));
     } else if (right_operand_->hasStaticValue()) {
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_JOIN
       const attribute_id left_operand_attr_id
@@ -238,24 +258,27 @@
         const bool using_left_relation = (left_operand_relation_id == left_relation_id);
         ValueAccessor *left_operand_accessor = using_left_relation ? left_accessor
                                                                    : right_accessor;
-        return fast_operator_->applyToValueAccessorAndStaticValueForJoin(
-            left_operand_accessor,
-            using_left_relation,
-            left_operand_attr_id,
-            right_operand_->getStaticValue(),
-            joined_tuple_ids);
+        return ColumnVectorPtr(
+            fast_operator_->applyToValueAccessorAndStaticValueForJoin(
+                left_operand_accessor,
+                using_left_relation,
+                left_operand_attr_id,
+                right_operand_->getStaticValue(),
+                joined_tuple_ids));
       }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_JOIN
 
-      std::unique_ptr<ColumnVector> left_result(
+      ColumnVectorPtr left_result(
           left_operand_->getAllValuesForJoin(left_relation_id,
                                              left_accessor,
                                              right_relation_id,
                                              right_accessor,
-                                             joined_tuple_ids));
-      return fast_operator_->applyToColumnVectorAndStaticValue(
-          *left_result,
-          right_operand_->getStaticValue());
+                                             joined_tuple_ids,
+                                             cv_cache));
+      return ColumnVectorPtr(
+          fast_operator_->applyToColumnVectorAndStaticValue(
+              *left_result,
+              right_operand_->getStaticValue()));
     } else {
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_JOIN
       const attribute_id left_operand_attr_id
@@ -284,28 +307,31 @@
               = (right_operand_relation_id == left_relation_id);
           ValueAccessor *right_operand_accessor = using_left_relation_for_right_operand ? left_accessor
                                                                                         : right_accessor;
-          return fast_operator_->applyToValueAccessorsForJoin(left_operand_accessor,
-                                                              using_left_relation_for_left_operand,
-                                                              left_operand_attr_id,
-                                                              right_operand_accessor,
-                                                              using_left_relation_for_right_operand,
-                                                              right_operand_attr_id,
-                                                              joined_tuple_ids);
+          return ColumnVectorPtr(
+              fast_operator_->applyToValueAccessorsForJoin(left_operand_accessor,
+                                                           using_left_relation_for_left_operand,
+                                                           left_operand_attr_id,
+                                                           right_operand_accessor,
+                                                           using_left_relation_for_right_operand,
+                                                           right_operand_attr_id,
+                                                           joined_tuple_ids));
         }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_JOIN_WITH_BINARY_EXPRESSIONS
-        std::unique_ptr<ColumnVector> right_result(
+        ColumnVectorPtr right_result(
             right_operand_->getAllValuesForJoin(left_relation_id,
                                                 left_accessor,
                                                 right_relation_id,
                                                 right_accessor,
-                                                joined_tuple_ids));
+                                                joined_tuple_ids,
+                                                cv_cache));
 
-        return fast_operator_->applyToValueAccessorAndColumnVectorForJoin(
-            left_operand_accessor,
-            using_left_relation_for_left_operand,
-            left_operand_attr_id,
-            *right_result,
-            joined_tuple_ids);
+        return ColumnVectorPtr(
+            fast_operator_->applyToValueAccessorAndColumnVectorForJoin(
+                left_operand_accessor,
+                using_left_relation_for_left_operand,
+                left_operand_attr_id,
+                *right_result,
+                joined_tuple_ids));
       } else if (right_operand_attr_id != -1) {
         const relation_id right_operand_relation_id
             = right_operand_->getRelationIdForValueAccessor();
@@ -317,34 +343,39 @@
         ValueAccessor *right_operand_accessor = using_left_relation_for_right_operand ? left_accessor
                                                                                       : right_accessor;
 
-        std::unique_ptr<ColumnVector> left_result(
+        ColumnVectorPtr left_result(
             left_operand_->getAllValuesForJoin(left_relation_id,
                                                left_accessor,
                                                right_relation_id,
                                                right_accessor,
-                                               joined_tuple_ids));
-        return fast_operator_->applyToColumnVectorAndValueAccessorForJoin(
-            *left_result,
-            right_operand_accessor,
-            using_left_relation_for_right_operand,
-            right_operand_attr_id,
-            joined_tuple_ids);
+                                               joined_tuple_ids,
+                                               cv_cache));
+        return ColumnVectorPtr(
+            fast_operator_->applyToColumnVectorAndValueAccessorForJoin(
+                *left_result,
+                right_operand_accessor,
+                using_left_relation_for_right_operand,
+                right_operand_attr_id,
+                joined_tuple_ids));
       }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_JOIN
 
-      std::unique_ptr<ColumnVector> left_result(
+      ColumnVectorPtr left_result(
           left_operand_->getAllValuesForJoin(left_relation_id,
                                              left_accessor,
                                              right_relation_id,
                                              right_accessor,
-                                             joined_tuple_ids));
-      std::unique_ptr<ColumnVector> right_result(
+                                             joined_tuple_ids,
+                                             cv_cache));
+      ColumnVectorPtr right_result(
           right_operand_->getAllValuesForJoin(left_relation_id,
                                               left_accessor,
                                               right_relation_id,
                                               right_accessor,
-                                              joined_tuple_ids));
-      return fast_operator_->applyToColumnVectors(*left_result, *right_result);
+                                              joined_tuple_ids,
+                                              cv_cache));
+      return ColumnVectorPtr(
+          fast_operator_->applyToColumnVectors(*left_result, *right_result));
     }
   }
 }
@@ -374,4 +405,38 @@
   }
 }
 
+void ScalarBinaryExpression::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<const Expression*> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<const Expression*>> *container_child_fields) const {
+  Scalar::getFieldStringItems(inline_field_names,
+                              inline_field_values,
+                              non_container_child_field_names,
+                              non_container_child_fields,
+                              container_child_field_names,
+                              container_child_fields);
+
+  if (fast_operator_ == nullptr) {
+    inline_field_names->emplace_back("static_value");
+    if (static_value_.isNull()) {
+      inline_field_values->emplace_back("NULL");
+    } else {
+      inline_field_values->emplace_back(type_.printValueToString(static_value_));
+    }
+  }
+
+  inline_field_names->emplace_back("operation");
+  inline_field_values->emplace_back(
+      kBinaryOperationNames[static_cast<std::underlying_type<BinaryOperationID>::type>(
+          operation_.getBinaryOperationID())]);
+
+  non_container_child_field_names->emplace_back("left_operand");
+  non_container_child_fields->emplace_back(left_operand_.get());
+  non_container_child_field_names->emplace_back("right_operand");
+  non_container_child_fields->emplace_back(right_operand_.get());
+}
+
 }  // namespace quickstep
diff --git a/expressions/scalar/ScalarBinaryExpression.hpp b/expressions/scalar/ScalarBinaryExpression.hpp
index c84792a..4ac1f62 100644
--- a/expressions/scalar/ScalarBinaryExpression.hpp
+++ b/expressions/scalar/ScalarBinaryExpression.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_EXPRESSIONS_SCALAR_SCALAR_BINARY_EXPRESSION_HPP_
 
 #include <memory>
+#include <string>
 #include <utility>
 #include <vector>
 
@@ -29,6 +30,7 @@
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
 #include "types/operations/binary_operations/BinaryOperation.hpp"
 #include "utility/Macros.hpp"
 
@@ -36,7 +38,7 @@
 
 namespace quickstep {
 
-class ColumnVector;
+class ColumnVectorCache;
 class ValueAccessor;
 
 struct SubBlocksReference;
@@ -97,15 +99,26 @@
     return static_value_;
   }
 
-  ColumnVector* getAllValues(ValueAccessor *accessor,
-                             const SubBlocksReference *sub_blocks_ref) const override;
+  ColumnVectorPtr getAllValues(ValueAccessor *accessor,
+                               const SubBlocksReference *sub_blocks_ref,
+                               ColumnVectorCache *cv_cache) const override;
 
-  ColumnVector* getAllValuesForJoin(
+  ColumnVectorPtr getAllValuesForJoin(
       const relation_id left_relation_id,
       ValueAccessor *left_accessor,
       const relation_id right_relation_id,
       ValueAccessor *right_accessor,
-      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids) const override;
+      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+      ColumnVectorCache *cv_cache) const override;
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<const Expression*> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<const Expression*>> *container_child_fields) const override;
 
  private:
   void initHelper(bool own_children);
diff --git a/expressions/scalar/ScalarCaseExpression.cpp b/expressions/scalar/ScalarCaseExpression.cpp
index c81f723..00a7710 100644
--- a/expressions/scalar/ScalarCaseExpression.cpp
+++ b/expressions/scalar/ScalarCaseExpression.cpp
@@ -21,6 +21,7 @@
 
 #include <cstddef>
 #include <memory>
+#include <string>
 #include <utility>
 #include <vector>
 
@@ -193,18 +194,21 @@
   }
 }
 
-ColumnVector* ScalarCaseExpression::getAllValues(
+ColumnVectorPtr ScalarCaseExpression::getAllValues(
     ValueAccessor *accessor,
-    const SubBlocksReference *sub_blocks_ref) const {
+    const SubBlocksReference *sub_blocks_ref,
+    ColumnVectorCache *cv_cache) const {
   return InvokeOnValueAccessorMaybeTupleIdSequenceAdapter(
       accessor,
-      [&](auto *accessor) -> ColumnVector* {  // NOLINT(build/c++11)
+      [&](auto *accessor) -> ColumnVectorPtr {  // NOLINT(build/c++11)
     if (has_static_value_) {
-      return ColumnVector::MakeVectorOfValue(type_,
-                                             static_value_,
-                                             accessor->getNumTuples());
+      return ColumnVectorPtr(
+          ColumnVector::MakeVectorOfValue(type_,
+                                          static_value_,
+                                          accessor->getNumTuples()));
     } else if (fixed_result_expression_ != nullptr) {
-      return fixed_result_expression_->getAllValues(accessor, sub_blocks_ref);
+      return fixed_result_expression_->getAllValues(
+          accessor, sub_blocks_ref, cv_cache);
     }
 
     const TupleIdSequence *accessor_sequence = accessor->getTupleIdSequence();
@@ -238,21 +242,23 @@
     }
 
     // Generate a ColumnVector of all the values for each case.
-    std::vector<std::unique_ptr<ColumnVector>> case_results;
+    std::vector<ColumnVectorPtr> case_results;
     for (std::vector<std::unique_ptr<TupleIdSequence>>::size_type case_idx = 0;
          case_idx < case_matches.size();
          ++case_idx) {
       std::unique_ptr<ValueAccessor> case_accessor(
           accessor->createSharedTupleIdSequenceAdapter(*case_matches[case_idx]));
       case_results.emplace_back(
-          result_expressions_[case_idx]->getAllValues(case_accessor.get(), sub_blocks_ref));
+          result_expressions_[case_idx]->getAllValues(
+              case_accessor.get(), sub_blocks_ref, cv_cache));
     }
 
-    std::unique_ptr<ColumnVector> else_results;
+    ColumnVectorPtr else_results;
     if (!else_matches->empty()) {
       std::unique_ptr<ValueAccessor> else_accessor(
           accessor->createSharedTupleIdSequenceAdapter(*else_matches));
-      else_results.reset(else_result_expression_->getAllValues(else_accessor.get(), sub_blocks_ref));
+      else_results = else_result_expression_->getAllValues(
+          else_accessor.get(), sub_blocks_ref, cv_cache);
     }
 
     // Multiplex per-case results into a single ColumnVector with values in the
@@ -262,17 +268,18 @@
         accessor_sequence,
         case_matches,
         *else_matches,
-        &case_results,
-        else_results.get());
+        case_results,
+        else_results);
   });
 }
 
-ColumnVector* ScalarCaseExpression::getAllValuesForJoin(
+ColumnVectorPtr ScalarCaseExpression::getAllValuesForJoin(
     const relation_id left_relation_id,
     ValueAccessor *left_accessor,
     const relation_id right_relation_id,
     ValueAccessor *right_accessor,
-    const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids) const {
+    const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+    ColumnVectorCache *cv_cache) const {
   // Slice 'joined_tuple_ids' apart by case.
   //
   // NOTE(chasseur): We use TupleIdSequence to keep track of the positions in
@@ -321,7 +328,7 @@
   }
 
   // Generate a ColumnVector of all the values for each case.
-  std::vector<std::unique_ptr<ColumnVector>> case_results;
+  std::vector<ColumnVectorPtr> case_results;
   for (std::vector<std::vector<std::pair<tuple_id, tuple_id>>>::size_type case_idx = 0;
        case_idx < case_matches.size();
        ++case_idx) {
@@ -330,22 +337,24 @@
         left_accessor,
         right_relation_id,
         right_accessor,
-        case_matches[case_idx]));
+        case_matches[case_idx],
+        cv_cache));
   }
 
-  std::unique_ptr<ColumnVector> else_results;
+  ColumnVectorPtr else_results;
   if (!else_positions.empty()) {
     std::vector<std::pair<tuple_id, tuple_id>> else_matches;
     for (tuple_id pos : else_positions) {
       else_matches.emplace_back(joined_tuple_ids[pos]);
     }
 
-    else_results.reset(else_result_expression_->getAllValuesForJoin(
+    else_results = else_result_expression_->getAllValuesForJoin(
         left_relation_id,
         left_accessor,
         right_relation_id,
         right_accessor,
-        else_matches));
+        else_matches,
+        cv_cache);
   }
 
   // Multiplex per-case results into a single ColumnVector with values in the
@@ -355,8 +364,8 @@
       nullptr,
       case_positions,
       else_positions,
-      &case_results,
-      else_results.get());
+      case_results,
+      else_results);
 }
 
 void ScalarCaseExpression::MultiplexNativeColumnVector(
@@ -420,15 +429,15 @@
 void ScalarCaseExpression::MultiplexIndirectColumnVector(
     const TupleIdSequence *source_sequence,
     const TupleIdSequence &case_matches,
-    IndirectColumnVector *case_result,
+    const IndirectColumnVector &case_result,
     IndirectColumnVector *output) {
   if (source_sequence == nullptr) {
     TupleIdSequence::const_iterator output_pos_it = case_matches.begin();
     for (std::size_t input_pos = 0;
-         input_pos < case_result->size();
+         input_pos < case_result.size();
          ++input_pos, ++output_pos_it) {
       output->positionalWriteTypedValue(*output_pos_it,
-                                        case_result->moveTypedValue(input_pos));
+                                        case_result.getTypedValue(input_pos));
     }
   } else {
     std::size_t input_pos = 0;
@@ -438,20 +447,20 @@
          ++output_pos, ++source_sequence_it) {
       if (case_matches.get(*source_sequence_it)) {
         output->positionalWriteTypedValue(output_pos,
-                                          case_result->moveTypedValue(input_pos++));
+                                          case_result.getTypedValue(input_pos++));
       }
     }
   }
 }
 
-ColumnVector* ScalarCaseExpression::multiplexColumnVectors(
+ColumnVectorPtr ScalarCaseExpression::multiplexColumnVectors(
     const std::size_t output_size,
     const TupleIdSequence *source_sequence,
     const std::vector<std::unique_ptr<TupleIdSequence>> &case_matches,
     const TupleIdSequence &else_matches,
-    std::vector<std::unique_ptr<ColumnVector>> *case_results,
-    ColumnVector *else_result) const {
-  DCHECK_EQ(case_matches.size(), case_results->size());
+    const std::vector<ColumnVectorPtr> &case_results,
+    const ColumnVectorPtr &else_result) const {
+  DCHECK_EQ(case_matches.size(), case_results.size());
 
   if (NativeColumnVector::UsableForType(type_)) {
     std::unique_ptr<NativeColumnVector> native_result(
@@ -461,12 +470,12 @@
     for (std::vector<std::unique_ptr<TupleIdSequence>>::size_type case_idx = 0;
          case_idx < case_matches.size();
          ++case_idx) {
-      DCHECK((*case_results)[case_idx]->isNative());
+      DCHECK(case_results[case_idx]->isNative());
       if (!case_matches[case_idx]->empty()) {
         MultiplexNativeColumnVector(
             source_sequence,
             *case_matches[case_idx],
-            static_cast<const NativeColumnVector&>(*(*case_results)[case_idx]),
+            static_cast<const NativeColumnVector&>(*case_results[case_idx]),
             native_result.get());
       }
     }
@@ -480,7 +489,7 @@
                                   native_result.get());
     }
 
-    return native_result.release();
+    return ColumnVectorPtr(native_result.release());
   } else {
     std::unique_ptr<IndirectColumnVector> indirect_result(
         new IndirectColumnVector(type_, output_size));
@@ -489,12 +498,12 @@
     for (std::vector<std::unique_ptr<TupleIdSequence>>::size_type case_idx = 0;
          case_idx < case_matches.size();
          ++case_idx) {
-      DCHECK(!(*case_results)[case_idx]->isNative());
+      DCHECK(!case_results[case_idx]->isNative());
       if (!case_matches[case_idx]->empty()) {
         MultiplexIndirectColumnVector(
             source_sequence,
             *case_matches[case_idx],
-            static_cast<IndirectColumnVector*>((*case_results)[case_idx].get()),
+            static_cast<const IndirectColumnVector&>(*case_results[case_idx]),
             indirect_result.get());
       }
     }
@@ -504,11 +513,52 @@
       DCHECK(!else_matches.empty());
       MultiplexIndirectColumnVector(source_sequence,
                                     else_matches,
-                                    static_cast<IndirectColumnVector*>(else_result),
+                                    static_cast<const IndirectColumnVector&>(*else_result),
                                     indirect_result.get());
     }
 
-    return indirect_result.release();
+    return ColumnVectorPtr(indirect_result.release());
+  }
+}
+
+void ScalarCaseExpression::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<const Expression*> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<const Expression*>> *container_child_fields) const {
+  Scalar::getFieldStringItems(inline_field_names,
+                              inline_field_values,
+                              non_container_child_field_names,
+                              non_container_child_fields,
+                              container_child_field_names,
+                              container_child_fields);
+
+  if (has_static_value_) {
+    inline_field_names->emplace_back("static_value");
+    if (static_value_.isNull()) {
+      inline_field_values->emplace_back("NULL");
+    } else {
+      inline_field_values->emplace_back(type_.printValueToString(static_value_));
+    }
+  }
+
+  container_child_field_names->emplace_back("when_predicates");
+  container_child_fields->emplace_back();
+  for (const auto &predicate : when_predicates_) {
+    container_child_fields->back().emplace_back(predicate.get());
+  }
+
+  container_child_field_names->emplace_back("result_expressions");
+  container_child_fields->emplace_back();
+  for (const auto &expression : result_expressions_) {
+    container_child_fields->back().emplace_back(expression.get());
+  }
+
+  if (else_result_expression_ != nullptr) {
+    non_container_child_field_names->emplace_back("else_result_expression");
+    non_container_child_fields->emplace_back(else_result_expression_.get());
   }
 }
 
diff --git a/expressions/scalar/ScalarCaseExpression.hpp b/expressions/scalar/ScalarCaseExpression.hpp
index e6809fa..3d0ed71 100644
--- a/expressions/scalar/ScalarCaseExpression.hpp
+++ b/expressions/scalar/ScalarCaseExpression.hpp
@@ -22,6 +22,7 @@
 
 #include <cstddef>
 #include <memory>
+#include <string>
 #include <utility>
 #include <vector>
 
@@ -31,15 +32,14 @@
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
 
 namespace quickstep {
 
-class ColumnVector;
-class IndirectColumnVector;
-class NativeColumnVector;
+class ColumnVectorCache;
 class TupleIdSequence;
 class Type;
 class ValueAccessor;
@@ -132,15 +132,26 @@
     }
   }
 
-  ColumnVector* getAllValues(ValueAccessor *accessor,
-                             const SubBlocksReference *sub_blocks_ref) const override;
+  ColumnVectorPtr getAllValues(ValueAccessor *accessor,
+                               const SubBlocksReference *sub_blocks_ref,
+                               ColumnVectorCache *cv_cache) const override;
 
-  ColumnVector* getAllValuesForJoin(
+  ColumnVectorPtr getAllValuesForJoin(
       const relation_id left_relation_id,
       ValueAccessor *left_accessor,
       const relation_id right_relation_id,
       ValueAccessor *right_accessor,
-      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids) const override;
+      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+      ColumnVectorCache *cv_cache) const override;
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<const Expression*> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<const Expression*>> *container_child_fields) const override;
 
  private:
   // Merge the values in the NativeColumnVector 'case_result' into '*output' at
@@ -158,7 +169,7 @@
   static void MultiplexIndirectColumnVector(
       const TupleIdSequence *source_sequence,
       const TupleIdSequence &case_matches,
-      IndirectColumnVector *case_result,
+      const IndirectColumnVector &case_result,
       IndirectColumnVector *output);
 
   // Create and return a new ColumnVector by multiplexing the ColumnVectors
@@ -171,13 +182,13 @@
   // the explicit WHEN clauses. Similarly, '*case_results' are the values
   // generated for the tuples matching each WHEN clause, and '*else_results'
   // are the values generated for the ELSE tuples.
-  ColumnVector* multiplexColumnVectors(
+  ColumnVectorPtr multiplexColumnVectors(
       const std::size_t output_size,
       const TupleIdSequence *source_sequence,
       const std::vector<std::unique_ptr<TupleIdSequence>> &case_matches,
       const TupleIdSequence &else_matches,
-      std::vector<std::unique_ptr<ColumnVector>> *case_results,
-      ColumnVector *else_result) const;
+      const std::vector<ColumnVectorPtr> &case_results,
+      const ColumnVectorPtr &else_result) const;
 
   std::vector<std::unique_ptr<Predicate>> when_predicates_;
   std::vector<std::unique_ptr<Scalar>> result_expressions_;
diff --git a/expressions/scalar/ScalarLiteral.cpp b/expressions/scalar/ScalarLiteral.cpp
index 48b5574..808953d 100644
--- a/expressions/scalar/ScalarLiteral.cpp
+++ b/expressions/scalar/ScalarLiteral.cpp
@@ -19,6 +19,7 @@
 
 #include "expressions/scalar/ScalarLiteral.hpp"
 
+#include <string>
 #include <utility>
 #include <vector>
 
@@ -47,24 +48,49 @@
   return new ScalarLiteral(internal_literal_, type_);
 }
 
-ColumnVector* ScalarLiteral::getAllValues(
+ColumnVectorPtr ScalarLiteral::getAllValues(
     ValueAccessor *accessor,
-    const SubBlocksReference *sub_blocks_ref) const {
-  return ColumnVector::MakeVectorOfValue(
-      type_,
-      internal_literal_,
-      accessor->getNumTuplesVirtual());
+    const SubBlocksReference *sub_blocks_ref,
+    ColumnVectorCache *cv_cache) const {
+  return ColumnVectorPtr(
+      ColumnVector::MakeVectorOfValue(type_,
+                                      internal_literal_,
+                                      accessor->getNumTuplesVirtual()));
 }
 
-ColumnVector* ScalarLiteral::getAllValuesForJoin(
+ColumnVectorPtr ScalarLiteral::getAllValuesForJoin(
     const relation_id left_relation_id,
     ValueAccessor *left_accessor,
     const relation_id right_relation_id,
     ValueAccessor *right_accessor,
-    const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids) const {
-  return ColumnVector::MakeVectorOfValue(type_,
-                                         internal_literal_,
-                                         joined_tuple_ids.size());
+    const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+    ColumnVectorCache *cv_cache) const {
+  return ColumnVectorPtr(
+      ColumnVector::MakeVectorOfValue(type_,
+                                      internal_literal_,
+                                      joined_tuple_ids.size()));
+}
+
+void ScalarLiteral::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<const Expression*> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<const Expression*>> *container_child_fields) const {
+  Scalar::getFieldStringItems(inline_field_names,
+                              inline_field_values,
+                              non_container_child_field_names,
+                              non_container_child_fields,
+                              container_child_field_names,
+                              container_child_fields);
+
+  inline_field_names->emplace_back("internal_literal");
+  if (internal_literal_.isNull()) {
+    inline_field_values->emplace_back("NULL");
+  } else {
+    inline_field_values->emplace_back(type_.printValueToString(internal_literal_));
+  }
 }
 
 }  // namespace quickstep
diff --git a/expressions/scalar/ScalarLiteral.hpp b/expressions/scalar/ScalarLiteral.hpp
index c7f5ceb..2a4c396 100644
--- a/expressions/scalar/ScalarLiteral.hpp
+++ b/expressions/scalar/ScalarLiteral.hpp
@@ -20,6 +20,7 @@
 #ifndef QUICKSTEP_EXPRESSIONS_SCALAR_SCALAR_LITERAL_HPP_
 #define QUICKSTEP_EXPRESSIONS_SCALAR_SCALAR_LITERAL_HPP_
 
+#include <string>
 #include <utility>
 #include <vector>
 
@@ -28,11 +29,12 @@
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
 
-class ColumnVector;
+class ColumnVectorCache;
 class Type;
 class ValueAccessor;
 
@@ -101,15 +103,26 @@
     return internal_literal_;
   }
 
-  ColumnVector* getAllValues(ValueAccessor *accessor,
-                             const SubBlocksReference *sub_blocks_ref) const override;
+  ColumnVectorPtr getAllValues(ValueAccessor *accessor,
+                               const SubBlocksReference *sub_blocks_ref,
+                               ColumnVectorCache *cv_cache) const override;
 
-  ColumnVector* getAllValuesForJoin(
+  ColumnVectorPtr getAllValuesForJoin(
       const relation_id left_relation_id,
       ValueAccessor *left_accessor,
       const relation_id right_relation_id,
       ValueAccessor *right_accessor,
-      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids) const override;
+      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+      ColumnVectorCache *cv_cache) const override;
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<const Expression*> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<const Expression*>> *container_child_fields) const override;
 
  private:
   TypedValue internal_literal_;
diff --git a/expressions/scalar/ScalarSharedExpression.cpp b/expressions/scalar/ScalarSharedExpression.cpp
new file mode 100644
index 0000000..f97c60b
--- /dev/null
+++ b/expressions/scalar/ScalarSharedExpression.cpp
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "expressions/scalar/ScalarSharedExpression.hpp"
+
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/Expressions.pb.h"
+#include "storage/ValueAccessor.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "utility/ColumnVectorCache.hpp"
+
+namespace quickstep {
+
+struct SubBlocksReference;
+
+ScalarSharedExpression::ScalarSharedExpression(const int share_id,
+                                               Scalar *operand)
+    : Scalar(operand->getType()),
+      share_id_(share_id),
+      operand_(operand) {
+}
+
+serialization::Scalar ScalarSharedExpression::getProto() const {
+  serialization::Scalar proto;
+  proto.set_data_source(serialization::Scalar::SHARED_EXPRESSION);
+  proto.SetExtension(serialization::ScalarSharedExpression::share_id, share_id_);
+  proto.MutableExtension(serialization::ScalarSharedExpression::operand)
+      ->CopyFrom(operand_->getProto());
+
+  return proto;
+}
+
+Scalar* ScalarSharedExpression::clone() const {
+  return new ScalarSharedExpression(share_id_, operand_->clone());
+}
+
+TypedValue ScalarSharedExpression::getValueForSingleTuple(const ValueAccessor &accessor,
+                                                          const tuple_id tuple) const {
+  return operand_->getValueForSingleTuple(accessor, tuple);
+}
+
+TypedValue ScalarSharedExpression::getValueForJoinedTuples(
+    const ValueAccessor &left_accessor,
+    const relation_id left_relation_id,
+    const tuple_id left_tuple_id,
+    const ValueAccessor &right_accessor,
+    const relation_id right_relation_id,
+    const tuple_id right_tuple_id) const {
+  return operand_->getValueForJoinedTuples(left_accessor,
+                                           left_relation_id,
+                                           left_tuple_id,
+                                           right_accessor,
+                                           right_relation_id,
+                                           right_tuple_id);
+}
+
+ColumnVectorPtr ScalarSharedExpression::getAllValues(
+    ValueAccessor *accessor,
+    const SubBlocksReference *sub_blocks_ref,
+    ColumnVectorCache *cv_cache) const {
+  if (cv_cache == nullptr) {
+    return operand_->getAllValues(accessor, sub_blocks_ref, cv_cache);
+  } else {
+    ColumnVectorPtr result;
+    if (cv_cache->contains(share_id_)) {
+      result = cv_cache->get(share_id_);
+    } else {
+      result = operand_->getAllValues(accessor, sub_blocks_ref, cv_cache);
+      cv_cache->set(share_id_, result);
+    }
+    return result;
+  }
+}
+
+ColumnVectorPtr ScalarSharedExpression::getAllValuesForJoin(
+    const relation_id left_relation_id,
+    ValueAccessor *left_accessor,
+    const relation_id right_relation_id,
+    ValueAccessor *right_accessor,
+    const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+    ColumnVectorCache *cv_cache) const {
+  if (cv_cache == nullptr) {
+    return operand_->getAllValuesForJoin(left_relation_id,
+                                         left_accessor,
+                                         right_relation_id,
+                                         right_accessor,
+                                         joined_tuple_ids,
+                                         cv_cache);
+  } else {
+    ColumnVectorPtr result;
+    if (cv_cache->contains(share_id_)) {
+      result = cv_cache->get(share_id_);
+    } else {
+      result = operand_->getAllValuesForJoin(left_relation_id,
+                                             left_accessor,
+                                             right_relation_id,
+                                             right_accessor,
+                                             joined_tuple_ids,
+                                             cv_cache);
+      cv_cache->set(share_id_, result);
+    }
+    return result;
+  }
+}
+
+void ScalarSharedExpression::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<const Expression*> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<const Expression*>> *container_child_fields) const {
+  inline_field_names->emplace_back("share_id");
+  inline_field_values->emplace_back(std::to_string(share_id_));
+
+  non_container_child_field_names->emplace_back("operand");
+  non_container_child_fields->emplace_back(operand_.get());
+}
+
+}  // namespace quickstep
diff --git a/expressions/scalar/ScalarSharedExpression.hpp b/expressions/scalar/ScalarSharedExpression.hpp
new file mode 100644
index 0000000..d5dddbc
--- /dev/null
+++ b/expressions/scalar/ScalarSharedExpression.hpp
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_EXPRESSIONS_SCALAR_SCALAR_SHARED_EXPRESSION_HPP_
+#define QUICKSTEP_EXPRESSIONS_SCALAR_SCALAR_SHARED_EXPRESSION_HPP_
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogTypedefs.hpp"
+#include "expressions/Expressions.pb.h"
+#include "expressions/scalar/Scalar.hpp"
+#include "storage/StorageBlockInfo.hpp"
+#include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+
+class ColumnVectorCache;
+class ValueAccessor;
+
+struct SubBlocksReference;
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief Scalars that represent common subexpressions whose results are cached
+ *        and shared.
+ **/
+class ScalarSharedExpression : public Scalar {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param share_id The unique integer identifier for each equivalence class of
+   *        common subexpressions.
+   * @param operand The underlying scalar subexpression.
+   **/
+  ScalarSharedExpression(const int share_id, Scalar *operand);
+
+  /**
+   * @brief Destructor.
+   **/
+  ~ScalarSharedExpression() override {
+  }
+
+  serialization::Scalar getProto() const override;
+
+  Scalar* clone() const override;
+
+  ScalarDataSource getDataSource() const override {
+    return kSharedExpression;
+  }
+
+  TypedValue getValueForSingleTuple(const ValueAccessor &accessor,
+                                    const tuple_id tuple) const override;
+
+  TypedValue getValueForJoinedTuples(
+      const ValueAccessor &left_accessor,
+      const relation_id left_relation_id,
+      const tuple_id left_tuple_id,
+      const ValueAccessor &right_accessor,
+      const relation_id right_relation_id,
+      const tuple_id right_tuple_id) const override;
+
+  bool hasStaticValue() const override {
+    return operand_->hasStaticValue();
+  }
+
+  const TypedValue& getStaticValue() const override {
+    return operand_->getStaticValue();
+  }
+
+  ColumnVectorPtr getAllValues(ValueAccessor *accessor,
+                               const SubBlocksReference *sub_blocks_ref,
+                               ColumnVectorCache *cv_cache) const override;
+
+  ColumnVectorPtr getAllValuesForJoin(
+      const relation_id left_relation_id,
+      ValueAccessor *left_accessor,
+      const relation_id right_relation_id,
+      ValueAccessor *right_accessor,
+      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+      ColumnVectorCache *cv_cache) const override;
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<const Expression*> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<const Expression*>> *container_child_fields) const override;
+
+ private:
+  const int share_id_;
+  std::unique_ptr<Scalar> operand_;
+
+  DISALLOW_COPY_AND_ASSIGN(ScalarSharedExpression);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_EXPRESSIONS_SCALAR_SCALAR_SHARED_EXPRESSION_HPP_
diff --git a/expressions/scalar/ScalarUnaryExpression.cpp b/expressions/scalar/ScalarUnaryExpression.cpp
index 72fdbe1..c51e38f 100644
--- a/expressions/scalar/ScalarUnaryExpression.cpp
+++ b/expressions/scalar/ScalarUnaryExpression.cpp
@@ -21,6 +21,7 @@
 
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -33,6 +34,7 @@
 #include "types/containers/ColumnVector.hpp"
 #include "types/operations/Operation.pb.h"
 #include "types/operations/unary_operations/UnaryOperation.hpp"
+#include "types/operations/unary_operations/UnaryOperationID.hpp"
 
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_JOIN
 #include "glog/logging.h"
@@ -91,36 +93,43 @@
   }
 }
 
-ColumnVector* ScalarUnaryExpression::getAllValues(
+ColumnVectorPtr ScalarUnaryExpression::getAllValues(
     ValueAccessor *accessor,
-    const SubBlocksReference *sub_blocks_ref) const {
+    const SubBlocksReference *sub_blocks_ref,
+    ColumnVectorCache *cv_cache) const {
   if (fast_operator_.get() == nullptr) {
-    return ColumnVector::MakeVectorOfValue(getType(),
-                                           static_value_,
-                                           accessor->getNumTuplesVirtual());
+    return ColumnVectorPtr(
+        ColumnVector::MakeVectorOfValue(getType(),
+                                        static_value_,
+                                        accessor->getNumTuplesVirtual()));
   } else {
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
     const attribute_id operand_attr_id = operand_->getAttributeIdForValueAccessor();
     if (operand_attr_id != -1) {
-      return fast_operator_->applyToValueAccessor(accessor, operand_attr_id);
+      return ColumnVectorPtr(
+          fast_operator_->applyToValueAccessor(accessor, operand_attr_id));
     }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION
 
-    std::unique_ptr<ColumnVector> operand_result(operand_->getAllValues(accessor, sub_blocks_ref));
-    return fast_operator_->applyToColumnVector(*operand_result);
+    ColumnVectorPtr operand_result(
+        operand_->getAllValues(accessor, sub_blocks_ref, cv_cache));
+    return ColumnVectorPtr(
+        fast_operator_->applyToColumnVector(*operand_result));
   }
 }
 
-ColumnVector* ScalarUnaryExpression::getAllValuesForJoin(
+ColumnVectorPtr ScalarUnaryExpression::getAllValuesForJoin(
     const relation_id left_relation_id,
     ValueAccessor *left_accessor,
     const relation_id right_relation_id,
     ValueAccessor *right_accessor,
-    const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids) const {
+    const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+    ColumnVectorCache *cv_cache) const {
   if (fast_operator_.get() == nullptr) {
-    return ColumnVector::MakeVectorOfValue(getType(),
-                                           static_value_,
-                                           joined_tuple_ids.size());
+    return ColumnVectorPtr(
+        ColumnVector::MakeVectorOfValue(getType(),
+                                        static_value_,
+                                        joined_tuple_ids.size()));
   } else {
 #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_JOIN
     const attribute_id operand_attr_id = operand_->getAttributeIdForValueAccessor();
@@ -132,20 +141,23 @@
       const bool using_left_relation = (operand_relation_id == left_relation_id);
       ValueAccessor *operand_accessor = using_left_relation ? left_accessor
                                                             : right_accessor;
-      return fast_operator_->applyToValueAccessorForJoin(operand_accessor,
-                                                         using_left_relation,
-                                                         operand_attr_id,
-                                                         joined_tuple_ids);
+      return ColumnVectorPtr(
+          fast_operator_->applyToValueAccessorForJoin(operand_accessor,
+                                                      using_left_relation,
+                                                      operand_attr_id,
+                                                      joined_tuple_ids));
     }
 #endif  // QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_JOIN
 
-    std::unique_ptr<ColumnVector> operand_result(
+    ColumnVectorPtr operand_result(
         operand_->getAllValuesForJoin(left_relation_id,
                                       left_accessor,
                                       right_relation_id,
                                       right_accessor,
-                                      joined_tuple_ids));
-    return fast_operator_->applyToColumnVector(*operand_result);
+                                      joined_tuple_ids,
+                                      cv_cache));
+    return ColumnVectorPtr(
+        fast_operator_->applyToColumnVector(*operand_result));
   }
 }
 
@@ -166,4 +178,36 @@
   }
 }
 
+void ScalarUnaryExpression::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<const Expression*> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<const Expression*>> *container_child_fields) const {
+  Scalar::getFieldStringItems(inline_field_names,
+                              inline_field_values,
+                              non_container_child_field_names,
+                              non_container_child_fields,
+                              container_child_field_names,
+                              container_child_fields);
+
+  if (fast_operator_ == nullptr) {
+    inline_field_names->emplace_back("static_value");
+    if (static_value_.isNull()) {
+      inline_field_values->emplace_back("NULL");
+    } else {
+      inline_field_values->emplace_back(type_.printValueToString(static_value_));
+    }
+  }
+
+  inline_field_names->emplace_back("operation");
+  inline_field_values->emplace_back(
+      kUnaryOperationNames[static_cast<std::underlying_type<UnaryOperationID>::type>(
+          operation_.getUnaryOperationID())]);
+
+  non_container_child_field_names->emplace_back("operand");
+  non_container_child_fields->emplace_back(operand_.get());
+}
+
 }  // namespace quickstep
diff --git a/expressions/scalar/ScalarUnaryExpression.hpp b/expressions/scalar/ScalarUnaryExpression.hpp
index 608a842..52edea7 100644
--- a/expressions/scalar/ScalarUnaryExpression.hpp
+++ b/expressions/scalar/ScalarUnaryExpression.hpp
@@ -21,6 +21,7 @@
 #define QUICKSTEP_EXPRESSIONS_SCALAR_SCALAR_UNARY_EXPRESSION_HPP_
 
 #include <memory>
+#include <string>
 #include <utility>
 #include <vector>
 
@@ -36,7 +37,7 @@
 
 namespace quickstep {
 
-class ColumnVector;
+class ColumnVectorCache;
 class ValueAccessor;
 
 struct SubBlocksReference;
@@ -93,15 +94,26 @@
     return static_value_;
   }
 
-  ColumnVector* getAllValues(ValueAccessor *accessor,
-                             const SubBlocksReference *sub_blocks_ref) const override;
+  ColumnVectorPtr getAllValues(ValueAccessor *accessor,
+                               const SubBlocksReference *sub_blocks_ref,
+                               ColumnVectorCache *cv_cache) const override;
 
-  ColumnVector* getAllValuesForJoin(
+  ColumnVectorPtr getAllValuesForJoin(
       const relation_id left_relation_id,
       ValueAccessor *left_accessor,
       const relation_id right_relation_id,
       ValueAccessor *right_accessor,
-      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids) const override;
+      const std::vector<std::pair<tuple_id, tuple_id>> &joined_tuple_ids,
+      ColumnVectorCache *cv_cache) const override;
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<const Expression*> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<const Expression*>> *container_child_fields) const override;
 
  private:
   void initHelper(bool own_children);
diff --git a/expressions/scalar/tests/ScalarCaseExpression_unittest.cpp b/expressions/scalar/tests/ScalarCaseExpression_unittest.cpp
index 2d1064b..2de9e84 100644
--- a/expressions/scalar/tests/ScalarCaseExpression_unittest.cpp
+++ b/expressions/scalar/tests/ScalarCaseExpression_unittest.cpp
@@ -223,8 +223,8 @@
       new ScalarLiteral(TypedValue(kVarChar, kThirdLawString, std::strlen(kThirdLawString) + 1),
                         varchar_type));
 
-  std::unique_ptr<ColumnVector> result_cv(
-      case_expr.getAllValues(&sample_data_value_accessor_, nullptr));
+  ColumnVectorPtr result_cv(
+      case_expr.getAllValues(&sample_data_value_accessor_, nullptr, nullptr));
   ASSERT_FALSE(result_cv->isNative());
   const IndirectColumnVector &indirect_result_cv
       = static_cast<const IndirectColumnVector&>(*result_cv);
@@ -308,8 +308,8 @@
       new ScalarLiteral(TypedValue(kVarChar, kThirdLawString, std::strlen(kThirdLawString) + 1),
                         varchar_type));
 
-  std::unique_ptr<ColumnVector> result_cv(
-      case_expr.getAllValues(filtered_accessor.get(), nullptr));
+  ColumnVectorPtr result_cv(
+      case_expr.getAllValues(filtered_accessor.get(), nullptr, nullptr));
   ASSERT_FALSE(result_cv->isNative());
   const IndirectColumnVector &indirect_result_cv
       = static_cast<const IndirectColumnVector&>(*result_cv);
@@ -380,8 +380,8 @@
       new ScalarLiteral(TypedValue(kVarChar, kThirdLawString, std::strlen(kThirdLawString) + 1),
                         varchar_type));
 
-  std::unique_ptr<ColumnVector> result_cv(
-      case_expr.getAllValues(&sample_data_value_accessor_, nullptr));
+  ColumnVectorPtr result_cv(
+      case_expr.getAllValues(&sample_data_value_accessor_, nullptr, nullptr));
   ASSERT_FALSE(result_cv->isNative());
   const IndirectColumnVector &indirect_result_cv
       = static_cast<const IndirectColumnVector&>(*result_cv);
@@ -474,8 +474,8 @@
           new ScalarLiteral(TypedValue(5), int_type),
           new ScalarAttribute(*sample_relation_->getAttributeById(0))));
 
-  std::unique_ptr<ColumnVector> result_cv(
-      case_expr.getAllValues(&sample_data_value_accessor_, nullptr));
+  ColumnVectorPtr result_cv(
+      case_expr.getAllValues(&sample_data_value_accessor_, nullptr, nullptr));
   ASSERT_TRUE(result_cv->isNative());
   const NativeColumnVector &native_result_cv
       = static_cast<const NativeColumnVector&>(*result_cv);
@@ -597,8 +597,8 @@
           new ScalarLiteral(TypedValue(5), int_type),
           new ScalarAttribute(*sample_relation_->getAttributeById(0))));
 
-  std::unique_ptr<ColumnVector> result_cv(
-      case_expr.getAllValues(filtered_accessor.get(), nullptr));
+  ColumnVectorPtr result_cv(
+      case_expr.getAllValues(filtered_accessor.get(), nullptr, nullptr));
   ASSERT_TRUE(result_cv->isNative());
   const NativeColumnVector &native_result_cv
       = static_cast<const NativeColumnVector&>(*result_cv);
@@ -707,8 +707,8 @@
           new ScalarLiteral(TypedValue(5), int_type),
           new ScalarAttribute(*sample_relation_->getAttributeById(0))));
 
-  std::unique_ptr<ColumnVector> result_cv(
-      case_expr.getAllValues(&sample_data_value_accessor_, nullptr));
+  ColumnVectorPtr result_cv(
+      case_expr.getAllValues(&sample_data_value_accessor_, nullptr, nullptr));
   ASSERT_TRUE(result_cv->isNative());
   const NativeColumnVector &native_result_cv
       = static_cast<const NativeColumnVector&>(*result_cv);
@@ -827,8 +827,8 @@
           new ScalarLiteral(TypedValue(5), int_type),
           new ScalarAttribute(*sample_relation_->getAttributeById(0))));
 
-  std::unique_ptr<ColumnVector> result_cv(
-      case_expr.getAllValues(filtered_accessor.get(), nullptr));
+  ColumnVectorPtr result_cv(
+      case_expr.getAllValues(filtered_accessor.get(), nullptr, nullptr));
   ASSERT_TRUE(result_cv->isNative());
   const NativeColumnVector &native_result_cv
       = static_cast<const NativeColumnVector&>(*result_cv);
@@ -929,12 +929,13 @@
     joined_tuple_ids.emplace_back(tuple_num, 1);
   }
 
-  std::unique_ptr<ColumnVector> result_cv(case_expr.getAllValuesForJoin(
+  ColumnVectorPtr result_cv(case_expr.getAllValuesForJoin(
       0,
       &sample_data_value_accessor_,
       1,
       &other_accessor,
-      joined_tuple_ids));
+      joined_tuple_ids,
+      nullptr));
   ASSERT_TRUE(result_cv->isNative());
   const NativeColumnVector &native_result_cv
       = static_cast<const NativeColumnVector&>(*result_cv);
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 08b6467..c969f16 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -214,12 +214,15 @@
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_rules_AttachLIPFilters
+                      quickstep_queryoptimizer_rules_CollapseSelection
+                      quickstep_queryoptimizer_rules_ExtractCommonSubexpression
                       quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_rules_InjectJoinFilters
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownLowCostDisjunctivePredicate
                       quickstep_queryoptimizer_rules_ReduceGroupByAttributes
                       quickstep_queryoptimizer_rules_ReorderColumns
+                      quickstep_queryoptimizer_rules_ReuseAggregateExpressions
                       quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
                       quickstep_queryoptimizer_rules_SwapProbeBuild
                       quickstep_queryoptimizer_strategy_Aggregate
diff --git a/query_optimizer/PhysicalGenerator.cpp b/query_optimizer/PhysicalGenerator.cpp
index ac51c31..5f3b1b7 100644
--- a/query_optimizer/PhysicalGenerator.cpp
+++ b/query_optimizer/PhysicalGenerator.cpp
@@ -27,12 +27,15 @@
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/rules/AttachLIPFilters.hpp"
+#include "query_optimizer/rules/CollapseSelection.hpp"
+#include "query_optimizer/rules/ExtractCommonSubexpression.hpp"
 #include "query_optimizer/rules/FuseAggregateJoin.hpp"
 #include "query_optimizer/rules/InjectJoinFilters.hpp"
 #include "query_optimizer/rules/PruneColumns.hpp"
 #include "query_optimizer/rules/PushDownLowCostDisjunctivePredicate.hpp"
 #include "query_optimizer/rules/ReduceGroupByAttributes.hpp"
 #include "query_optimizer/rules/ReorderColumns.hpp"
+#include "query_optimizer/rules/ReuseAggregateExpressions.hpp"
 #include "query_optimizer/rules/StarSchemaHashJoinOrderOptimization.hpp"
 #include "query_optimizer/rules/SwapProbeBuild.hpp"
 #include "query_optimizer/strategy/Aggregate.hpp"
@@ -121,12 +124,6 @@
   std::vector<std::unique_ptr<Rule<P::Physical>>> rules;
   rules.emplace_back(new PruneColumns());
 
-  // TODO(jianqiao): It is possible for PushDownLowCostDisjunctivePredicate to
-  // generate two chaining Selection nodes that can actually be fused into one.
-  // Note that currently it is okay to have the two Selections because they are
-  // applied to a small cardinality stored relation, which is very light-weight.
-  // However it is better to have a FuseSelection optimization (or even a more
-  // general FusePhysical optimization) in the future.
   rules.emplace_back(new PushDownLowCostDisjunctivePredicate());
 
   rules.emplace_back(new ReduceGroupByAttributes(optimizer_context_));
@@ -146,11 +143,31 @@
     rules.emplace_back(new ReorderColumns());
   }
 
+  // This optimization pass eliminates duplicate aggregates and converts AVG to
+  // SUM/COUNT if appropriate. Note that this optimization needs to be done before
+  // ExtractCommonSubexpression.
+  rules.emplace_back(new ReuseAggregateExpressions(optimizer_context_));
+
   rules.emplace_back(new FuseAggregateJoin());
 
-  // NOTE(jianqiao): Adding rules after InjectJoinFilters (or AttachLIPFilters) requires
-  // extra handling of LIPFilterConfiguration for transformed nodes. So currently it is
-  // suggested that all the new rules be placed before this point.
+  // Some of the optimization passes (e.g. PushDownLowCostDisjunctivePredicate
+  // and ReuseAggregateExpressions) might add extra Selection nodes and extra
+  // projection columns for their convenience. So we collapse Selection nodes
+  // and prune unnecessary columns here.
+  rules.emplace_back(new CollapseSelection());
+  rules.emplace_back(new PruneColumns());
+
+  // This optimization pass identifies common subexpressions and wraps them with
+  // CommonSubexpression nodes, where identical CommonSubexpression nodes share
+  // a same unique integer ID. Later in the backend we use memoization tables to
+  // memorize the result column vectors for each ID so that each group has its
+  // common subexpression evaluated only once.
+  rules.emplace_back(new ExtractCommonSubexpression(optimizer_context_));
+
+  // NOTE(jianqiao): Adding rules after InjectJoinFilters (or AttachLIPFilters)
+  // requires extra handling of LIPFilterConfiguration for transformed nodes.
+  // So currently it is suggested that all the new rules be placed before this
+  // point.
   if (FLAGS_use_filter_joins) {
     rules.emplace_back(new InjectJoinFilters());
   }
diff --git a/query_optimizer/expressions/AttributeReference.cpp b/query_optimizer/expressions/AttributeReference.cpp
index f0e49d4..facfb39 100644
--- a/query_optimizer/expressions/AttributeReference.cpp
+++ b/query_optimizer/expressions/AttributeReference.cpp
@@ -19,6 +19,8 @@
 
 #include "query_optimizer/expressions/AttributeReference.hpp"
 
+#include <cstddef>
+#include <functional>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -26,6 +28,7 @@
 #include "expressions/scalar/ScalarAttribute.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
 
 #include "glog/logging.h"
 
@@ -57,6 +60,22 @@
   return new ::quickstep::ScalarAttribute(*found_it->second);
 }
 
+std::size_t AttributeReference::computeHash() const {
+  return std::hash<std::size_t>()(static_cast<std::size_t>(id()));
+}
+
+bool AttributeReference::equals(const ScalarPtr &other) const {
+  AttributeReferencePtr attr;
+  if (SomeAttributeReference::MatchesWithConditionalCast(other, &attr)) {
+    if (id() != attr->id()) {
+      return false;
+    }
+    DCHECK(type_.equals(attr->type_));
+    return true;
+  }
+  return false;
+}
+
 void AttributeReference::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/expressions/AttributeReference.hpp b/query_optimizer/expressions/AttributeReference.hpp
index f5207b1..5ace894 100644
--- a/query_optimizer/expressions/AttributeReference.hpp
+++ b/query_optimizer/expressions/AttributeReference.hpp
@@ -20,6 +20,7 @@
 #ifndef QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_ATTRIBUTE_REFERENCE_HPP_
 #define QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_ATTRIBUTE_REFERENCE_HPP_
 
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -30,6 +31,7 @@
 #include "query_optimizer/expressions/Expression.hpp"
 #include "query_optimizer/expressions/ExpressionType.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -88,6 +90,8 @@
   ::quickstep::Scalar* concretize(
       const std::unordered_map<ExprId, const CatalogAttribute*> &substitution_map) const override;
 
+  bool equals(const ScalarPtr &other) const override;
+
   /**
    * @brief Creates an immutable AttributReference.
    *
@@ -114,6 +118,8 @@
   }
 
  protected:
+  std::size_t computeHash() const override;
+
   void getFieldStringItems(
      std::vector<std::string> *inline_field_names,
      std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/expressions/BinaryExpression.cpp b/query_optimizer/expressions/BinaryExpression.cpp
index 446dd55..f49c6a2 100644
--- a/query_optimizer/expressions/BinaryExpression.cpp
+++ b/query_optimizer/expressions/BinaryExpression.cpp
@@ -19,6 +19,8 @@
 
 #include "query_optimizer/expressions/BinaryExpression.hpp"
 
+#include <algorithm>
+#include <cstddef>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -31,6 +33,7 @@
 #include "query_optimizer/expressions/PatternMatcher.hpp"
 #include "types/operations/binary_operations/BinaryOperation.hpp"
 #include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "utility/HashPair.hpp"
 
 #include "glog/logging.h"
 
@@ -104,6 +107,40 @@
       right_->concretize(substitution_map));
 }
 
+std::size_t BinaryExpression::computeHash() const {
+  std::size_t left_hash = left_->hash();
+  std::size_t right_hash = right_->hash();
+
+  if (operation_.isCommutative() && left_hash > right_hash) {
+    std::swap(left_hash, right_hash);
+  }
+
+  return CombineHashes(
+      CombineHashes(static_cast<std::size_t>(ExpressionType::kBinaryExpression),
+                    static_cast<std::size_t>(operation_.getBinaryOperationID())),
+      CombineHashes(left_hash, right_hash));
+}
+
+bool BinaryExpression::equals(const ScalarPtr &other) const {
+  BinaryExpressionPtr expr;
+  if (SomeBinaryExpression::MatchesWithConditionalCast(other, &expr) &&
+      &operation_ == &expr->operation_) {
+    ScalarPtr left = left_;
+    ScalarPtr right = right_;
+
+    if (operation_.isCommutative()) {
+      const bool self_order = (left_->hash() < right_->hash());
+      const bool other_order = (expr->left_->hash() < expr->right_->hash());
+      if (self_order ^ other_order) {
+        std::swap(left, right);
+      }
+    }
+
+    return left->equals(expr->left_) && right->equals(expr->right_);
+  }
+  return false;
+}
+
 void BinaryExpression::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/expressions/BinaryExpression.hpp b/query_optimizer/expressions/BinaryExpression.hpp
index 9b11ed1..6a37679 100644
--- a/query_optimizer/expressions/BinaryExpression.hpp
+++ b/query_optimizer/expressions/BinaryExpression.hpp
@@ -90,6 +90,8 @@
   ::quickstep::Scalar* concretize(
       const std::unordered_map<ExprId, const CatalogAttribute*> &substitution_map) const override;
 
+  bool equals(const ScalarPtr &other) const override;
+
   static BinaryExpressionPtr Create(const BinaryOperation &operation,
                                     const ScalarPtr &left,
                                     const ScalarPtr &right) {
@@ -97,6 +99,8 @@
   }
 
  protected:
+  std::size_t computeHash() const override;
+
   void getFieldStringItems(
       std::vector<std::string> *inline_field_names,
       std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/expressions/CMakeLists.txt b/query_optimizer/expressions/CMakeLists.txt
index 35fac90..3e7f8e4 100644
--- a/query_optimizer/expressions/CMakeLists.txt
+++ b/query_optimizer/expressions/CMakeLists.txt
@@ -21,7 +21,11 @@
 add_library(quickstep_queryoptimizer_expressions_AttributeReference AttributeReference.cpp AttributeReference.hpp)
 add_library(quickstep_queryoptimizer_expressions_BinaryExpression BinaryExpression.cpp BinaryExpression.hpp)
 add_library(quickstep_queryoptimizer_expressions_Cast Cast.cpp Cast.hpp)
-add_library(quickstep_queryoptimizer_expressions_ComparisonExpression ComparisonExpression.cpp
+add_library(quickstep_queryoptimizer_expressions_CommonSubexpression
+            CommonSubexpression.cpp
+            CommonSubexpression.hpp)
+add_library(quickstep_queryoptimizer_expressions_ComparisonExpression
+            ComparisonExpression.cpp
             ComparisonExpression.hpp)
 add_library(quickstep_queryoptimizer_expressions_Exists Exists.cpp Exists.hpp)
 add_library(quickstep_queryoptimizer_expressions_Expression ../../empty_src.cpp Expression.hpp)
@@ -43,7 +47,9 @@
 add_library(quickstep_queryoptimizer_expressions_SimpleCase SimpleCase.cpp SimpleCase.hpp)
 add_library(quickstep_queryoptimizer_expressions_SubqueryExpression SubqueryExpression.cpp SubqueryExpression.hpp)
 add_library(quickstep_queryoptimizer_expressions_UnaryExpression UnaryExpression.cpp UnaryExpression.hpp)
-add_library(quickstep_queryoptimizer_expressions_WindowAggregateFunction WindowAggregateFunction.cpp WindowAggregateFunction.hpp)
+add_library(quickstep_queryoptimizer_expressions_WindowAggregateFunction
+            WindowAggregateFunction.cpp
+            WindowAggregateFunction.hpp)
 
 # Link dependencies:
 target_link_libraries(quickstep_queryoptimizer_expressions_AggregateFunction
@@ -78,6 +84,8 @@
                       quickstep_queryoptimizer_expressions_Expression
                       quickstep_queryoptimizer_expressions_ExpressionType
                       quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_expressions_Scalar
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_expressions_BinaryExpression
                       glog
@@ -91,6 +99,7 @@
                       quickstep_queryoptimizer_expressions_Scalar
                       quickstep_types_operations_binaryoperations_BinaryOperation
                       quickstep_types_operations_binaryoperations_BinaryOperationID
+                      quickstep_utility_HashPair
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_expressions_Cast
                       glog
@@ -105,6 +114,18 @@
                       quickstep_queryoptimizer_expressions_Scalar
                       quickstep_types_Type
                       quickstep_types_operations_unaryoperations_NumericCastOperation
+                      quickstep_utility_HashPair
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_expressions_CommonSubexpression
+                      glog
+                      quickstep_expressions_scalar_ScalarSharedExpression
+                      quickstep_queryoptimizer_OptimizerTree
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_expressions_Expression
+                      quickstep_queryoptimizer_expressions_ExpressionType
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_expressions_Scalar
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_expressions_ComparisonExpression
                       glog
@@ -233,6 +254,7 @@
                       glog
                       quickstep_queryoptimizer_expressions_ExprId
                       quickstep_queryoptimizer_expressions_Expression
+                      quickstep_utility_HashError
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_expressions_ScalarLiteral
                       glog
@@ -242,9 +264,11 @@
                       quickstep_queryoptimizer_expressions_ExprId
                       quickstep_queryoptimizer_expressions_Expression
                       quickstep_queryoptimizer_expressions_ExpressionType
+                      quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_expressions_Scalar
                       quickstep_types_Type
                       quickstep_types_TypedValue
+                      quickstep_utility_HashPair
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_expressions_SearchedCase
                       quickstep_expressions_predicate_Predicate
@@ -272,12 +296,14 @@
                       quickstep_queryoptimizer_expressions_ExprId
                       quickstep_queryoptimizer_expressions_Expression
                       quickstep_queryoptimizer_expressions_ExpressionType
+                      quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_expressions_Predicate
                       quickstep_queryoptimizer_expressions_Scalar
                       quickstep_types_Type
                       quickstep_types_operations_comparisons_ComparisonFactory
                       quickstep_types_operations_comparisons_ComparisonID
                       quickstep_utility_Cast
+                      quickstep_utility_HashPair
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_expressions_SubqueryExpression
                       glog
@@ -301,6 +327,7 @@
                       quickstep_queryoptimizer_expressions_Scalar
                       quickstep_types_operations_unaryoperations_UnaryOperation
                       quickstep_types_operations_unaryoperations_UnaryOperationID
+                      quickstep_utility_HashPair
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_expressions_WindowAggregateFunction
                       glog
@@ -324,6 +351,7 @@
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_BinaryExpression
                       quickstep_queryoptimizer_expressions_Cast
+                      quickstep_queryoptimizer_expressions_CommonSubexpression
                       quickstep_queryoptimizer_expressions_ComparisonExpression
                       quickstep_queryoptimizer_expressions_Exists
                       quickstep_queryoptimizer_expressions_Expression
diff --git a/query_optimizer/expressions/Cast.cpp b/query_optimizer/expressions/Cast.cpp
index c0813c5..e6eb1bd 100644
--- a/query_optimizer/expressions/Cast.cpp
+++ b/query_optimizer/expressions/Cast.cpp
@@ -19,6 +19,7 @@
 
 #include "query_optimizer/expressions/Cast.hpp"
 
+#include <cstddef>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -33,6 +34,7 @@
 #include "query_optimizer/expressions/Scalar.hpp"
 #include "types/Type.hpp"
 #include "types/operations/unary_operations/NumericCastOperation.hpp"
+#include "utility/HashPair.hpp"
 
 #include "glog/logging.h"
 
@@ -55,6 +57,21 @@
                                                 operand_->concretize(substitution_map));
 }
 
+std::size_t Cast::computeHash() const {
+  return CombineHashes(
+      CombineHashes(static_cast<std::size_t>(ExpressionType::kCast),
+                    operand_->hash()),
+      static_cast<std::size_t>(target_type_.getTypeID()));
+}
+
+bool Cast::equals(const ScalarPtr &other) const {
+  CastPtr expr;
+  if (SomeCast::MatchesWithConditionalCast(other, &expr)) {
+    return operand_->equals(expr->operand_) && target_type_.equals(expr->target_type_);
+  }
+  return false;
+}
+
 void Cast::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/expressions/Cast.hpp b/query_optimizer/expressions/Cast.hpp
index ac5bd02..11be775 100644
--- a/query_optimizer/expressions/Cast.hpp
+++ b/query_optimizer/expressions/Cast.hpp
@@ -78,6 +78,8 @@
   ::quickstep::Scalar* concretize(
       const std::unordered_map<ExprId, const CatalogAttribute*> &substitution_map) const override;
 
+  bool equals(const ScalarPtr &other) const override;
+
   /**
    * @brief Creates a Cast expression that converts \p operand to \p target_type.
    *
@@ -90,6 +92,8 @@
   }
 
  protected:
+  std::size_t computeHash() const override;
+
   void getFieldStringItems(
       std::vector<std::string> *inline_field_names,
       std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/expressions/CommonSubexpression.cpp b/query_optimizer/expressions/CommonSubexpression.cpp
new file mode 100644
index 0000000..4b13a0e
--- /dev/null
+++ b/query_optimizer/expressions/CommonSubexpression.cpp
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/expressions/CommonSubexpression.hpp"
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "expressions/scalar/ScalarSharedExpression.hpp"
+#include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+namespace expressions {
+
+ExpressionPtr CommonSubexpression::copyWithNewChildren(
+    const std::vector<ExpressionPtr> &new_children) const {
+  DCHECK_EQ(new_children.size(), children().size());
+  DCHECK(SomeScalar::Matches(new_children[0]));
+  return CommonSubexpression::Create(
+      common_subexpression_id_,
+      std::static_pointer_cast<const Scalar>(new_children[0]));
+}
+
+::quickstep::Scalar* CommonSubexpression::concretize(
+    const std::unordered_map<ExprId, const CatalogAttribute*> &substitution_map) const {
+  return new ::quickstep::ScalarSharedExpression(
+      static_cast<int>(common_subexpression_id_),
+      operand_->concretize(substitution_map));
+}
+
+void CommonSubexpression::getFieldStringItems(
+    std::vector<std::string> *inline_field_names,
+    std::vector<std::string> *inline_field_values,
+    std::vector<std::string> *non_container_child_field_names,
+    std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+    std::vector<std::string> *container_child_field_names,
+    std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const {
+  inline_field_names->push_back("common_subexpression_id");
+  inline_field_values->push_back(std::to_string(common_subexpression_id_));
+
+  non_container_child_field_names->push_back("Operand");
+  non_container_child_fields->push_back(operand_);
+}
+
+}  // namespace expressions
+}  // namespace optimizer
+}  // namespace quickstep
diff --git a/query_optimizer/expressions/CommonSubexpression.hpp b/query_optimizer/expressions/CommonSubexpression.hpp
new file mode 100644
index 0000000..ce7589d
--- /dev/null
+++ b/query_optimizer/expressions/CommonSubexpression.hpp
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_COMMON_SUBEXPRESSION_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_COMMON_SUBEXPRESSION_HPP_
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/ExpressionType.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+class Scalar;
+class Type;
+
+namespace optimizer {
+namespace expressions {
+
+/** \addtogroup OptimizerExpressions
+ *  @{
+ */
+
+class CommonSubexpression;
+typedef std::shared_ptr<const CommonSubexpression> CommonSubexpressionPtr;
+
+/**
+ * @brief A wrapper expression that represents a common subexpression.
+ */
+class CommonSubexpression : public Scalar {
+ public:
+  ExpressionType getExpressionType() const override {
+    return ExpressionType::kCommonSubexpression;
+  }
+
+  std::string getName() const override {
+    return "CommonSubexpression";
+  }
+
+  bool isConstant() const override {
+    return operand_->isConstant();
+  }
+
+  /**
+   * @return The unique ID for the equivalence class that this common subexpression
+   *         belongs to.
+   */
+  inline ExprId common_subexpression_id() const {
+    return common_subexpression_id_;
+  }
+
+  /**
+   * @return The operand that represents the underlying subexpression.
+   */
+  const ScalarPtr& operand() const {
+    return operand_;
+  }
+
+  const Type& getValueType() const override {
+    return operand_->getValueType();
+  }
+
+  ExpressionPtr copyWithNewChildren(
+      const std::vector<ExpressionPtr> &new_children) const override;
+
+  std::vector<AttributeReferencePtr> getReferencedAttributes() const override {
+    return operand_->getReferencedAttributes();
+  }
+
+  ::quickstep::Scalar* concretize(
+      const std::unordered_map<ExprId, const CatalogAttribute*> &substitution_map) const override;
+
+  /**
+   * @brief Creates an immutable CommonSubexpression.
+   *
+   * @param common_subexpression_id A unique ID for the equivalence class that
+   *        this common subexpressions belongs to.
+   * @param operand The operand that represents the underlying subexpression.
+   * @return An immutable CommonSubexpression.
+   */
+  static CommonSubexpressionPtr Create(const ExprId common_subexpression_id,
+                                       const ScalarPtr &operand) {
+    return CommonSubexpressionPtr(
+        new CommonSubexpression(common_subexpression_id, operand));
+  }
+
+ protected:
+  void getFieldStringItems(
+      std::vector<std::string> *inline_field_names,
+      std::vector<std::string> *inline_field_values,
+      std::vector<std::string> *non_container_child_field_names,
+      std::vector<OptimizerTreeBaseNodePtr> *non_container_child_fields,
+      std::vector<std::string> *container_child_field_names,
+      std::vector<std::vector<OptimizerTreeBaseNodePtr>> *container_child_fields) const override;
+
+ private:
+  CommonSubexpression(const ExprId common_subexpression_id,
+                      const ScalarPtr &operand)
+      : common_subexpression_id_(common_subexpression_id),
+        operand_(operand) {
+    addChild(operand);
+  }
+
+  ExprId common_subexpression_id_;
+  ScalarPtr operand_;
+
+  DISALLOW_COPY_AND_ASSIGN(CommonSubexpression);
+};
+
+/** @} */
+
+}  // namespace expressions
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_COMMON_SUBEXPRESSION_HPP_
diff --git a/query_optimizer/expressions/ExpressionType.hpp b/query_optimizer/expressions/ExpressionType.hpp
index 5008f1d..ba7f639 100644
--- a/query_optimizer/expressions/ExpressionType.hpp
+++ b/query_optimizer/expressions/ExpressionType.hpp
@@ -32,11 +32,12 @@
  * @brief Optimizer expression types.
  **/
 enum class ExpressionType {
-  kAggregateFunction,
+  kAggregateFunction = 0,
   kAlias,
   kAttributeReference,
   kBinaryExpression,
   kCast,
+  kCommonSubexpression,
   kComparisonExpression,
   kExists,
   kInTableQuery,
diff --git a/query_optimizer/expressions/ExpressionUtil.hpp b/query_optimizer/expressions/ExpressionUtil.hpp
index 6b8666e..29d90f0 100644
--- a/query_optimizer/expressions/ExpressionUtil.hpp
+++ b/query_optimizer/expressions/ExpressionUtil.hpp
@@ -85,9 +85,9 @@
 bool ContainsExpression(
     const std::vector<std::shared_ptr<const NamedExpressionType1>> &expressions,
     const std::shared_ptr<const NamedExpressionType2> &expression_to_check) {
-  for (const std::shared_ptr<const NamedExpressionType1> &expression :
-       expressions) {
-    if (expression->equals(expression_to_check)) {
+  for (const auto &expression : expressions) {
+    if (expression->id() == expression_to_check->id()) {
+      DCHECK(expression->getExpressionType() == expression_to_check->getExpressionType());
       return true;
     }
   }
diff --git a/query_optimizer/expressions/NamedExpression.hpp b/query_optimizer/expressions/NamedExpression.hpp
index 9de8005..6725567 100644
--- a/query_optimizer/expressions/NamedExpression.hpp
+++ b/query_optimizer/expressions/NamedExpression.hpp
@@ -69,19 +69,6 @@
    */
   inline const std::string& relation_name() const { return relation_name_; }
 
-  /**
-   * @brief Compares this named expression with \p other. Two named expressions
-   *        are equal if they have the same ExprId and are both Alias or
-   *        AttributeReference.
-   *
-   * @param other Another named expression to compare with.
-   * @return True if the named expression is equal to \p other.
-   */
-  inline bool equals(const NamedExpressionPtr &other) const {
-    return getExpressionType() == other->getExpressionType() &&
-           id_ == other->id();
-  }
-
  protected:
   /**
    * @brief Constructor.
diff --git a/query_optimizer/expressions/PatternMatcher.hpp b/query_optimizer/expressions/PatternMatcher.hpp
index 18d6b1c..e30a4d9 100644
--- a/query_optimizer/expressions/PatternMatcher.hpp
+++ b/query_optimizer/expressions/PatternMatcher.hpp
@@ -35,6 +35,7 @@
 class AttributeReference;
 class BinaryExpression;
 class Cast;
+class CommonSubexpression;
 class ComparisonExpression;
 class Count;
 class Exists;
@@ -50,6 +51,7 @@
 class PredicateLiteral;
 class Scalar;
 class ScalarLiteral;
+class SimpleCase;
 class Sum;
 class UnaryExpression;
 class WindowAggregateFunction;
@@ -145,16 +147,13 @@
                                       ExpressionType::kAttributeReference,
                                       ExpressionType::kBinaryExpression,
                                       ExpressionType::kCast,
-                                      ExpressionType::kComparisonExpression,
-                                      ExpressionType::kLogicalAnd,
-                                      ExpressionType::kLogicalNot,
-                                      ExpressionType::kLogicalOr,
-                                      ExpressionType::kPredicateLiteral,
+                                      ExpressionType::kCommonSubexpression,
                                       ExpressionType::kScalarLiteral,
                                       ExpressionType::kSearchedCase,
                                       ExpressionType::kSimpleCase,
                                       ExpressionType::kUnaryExpression>;
 using SomeScalarLiteral = SomeExpressionNode<ScalarLiteral, ExpressionType::kScalarLiteral>;
+using SomeSimpleCase = SomeExpressionNode<SimpleCase, ExpressionType::kSimpleCase>;
 using SomeUnaryExpression = SomeExpressionNode<UnaryExpression, ExpressionType::kUnaryExpression>;
 using SomeWindowAggregateFunction = SomeExpressionNode<WindowAggregateFunction,
                                                        ExpressionType::kWindowAggregateFunction>;
diff --git a/query_optimizer/expressions/Scalar.hpp b/query_optimizer/expressions/Scalar.hpp
index 4870118..a163b21 100644
--- a/query_optimizer/expressions/Scalar.hpp
+++ b/query_optimizer/expressions/Scalar.hpp
@@ -20,11 +20,13 @@
 #ifndef QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_SCALAR_HPP_
 #define QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_SCALAR_HPP_
 
+#include <cstddef>
 #include <memory>
 #include <unordered_map>
 
 #include "query_optimizer/expressions/Expression.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
+#include "utility/HashError.hpp"
 #include "utility/Macros.hpp"
 
 namespace quickstep {
@@ -65,10 +67,49 @@
       const std::unordered_map<ExprId, const CatalogAttribute*>& substitution_map)
       const = 0;
 
+  /**
+   * @brief Check whether this scalar is semantically equivalent to \p other.
+   *
+   * @note The fact that two scalars are semantically equal brings more
+   *       optimization opportunities, e.g. common subexpression elimination.
+   *       Meanwhile, it is always safe to assume that two scalars are not equal.
+   *
+   * @return True if this scalar equals \p other; false otherwise.
+   */
+  virtual bool equals(const ScalarPtr &other) const {
+    return false;
+  }
+
+  /**
+   * @brief Get a hash of this scalar.
+   *
+   * @return A hash of this scalar.
+   */
+  std::size_t hash() const {
+    if (hash_cache_ == nullptr) {
+      hash_cache_ = std::make_unique<std::size_t>(computeHash());
+    }
+    return *hash_cache_;
+  }
+
  protected:
   Scalar() {}
 
+  /**
+   * @brief Compute a hash of this scalar.
+   *
+   * @note Override this method to actually compute the hash. Note that the
+   *       public method hash() is a caching wrapper for this method.
+   *
+   * @return A hash of this scalar.
+   */
+  virtual std::size_t computeHash() const {
+    throw HashNotSupported("Unsupported computeHash() in " + getName());
+  }
+
  private:
+  mutable std::unique_ptr<std::size_t> hash_cache_;
+
   DISALLOW_COPY_AND_ASSIGN(Scalar);
 };
 
diff --git a/query_optimizer/expressions/ScalarLiteral.cpp b/query_optimizer/expressions/ScalarLiteral.cpp
index d70c4cf..d2ab527 100644
--- a/query_optimizer/expressions/ScalarLiteral.cpp
+++ b/query_optimizer/expressions/ScalarLiteral.cpp
@@ -19,6 +19,7 @@
 
 #include "query_optimizer/expressions/ScalarLiteral.hpp"
 
+#include <cstddef>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -28,7 +29,9 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
 #include "types/Type.hpp"
+#include "utility/HashPair.hpp"
 
 #include "glog/logging.h"
 
@@ -51,6 +54,26 @@
   return new ::quickstep::ScalarLiteral(value_, value_type_);
 }
 
+std::size_t ScalarLiteral::computeHash() const {
+  std::size_t hash_code = static_cast<std::size_t>(ExpressionType::kScalarLiteral);
+  if (!value_.isNull()) {
+    hash_code = CombineHashes(hash_code, value_.getHash());
+  }
+  return hash_code;
+}
+
+bool ScalarLiteral::equals(const ScalarPtr &other) const {
+  ScalarLiteralPtr lit;
+  if (SomeScalarLiteral::MatchesWithConditionalCast(other, &lit) &&
+      value_type_.equals(lit->value_type_)) {
+    if (value_.isNull() || lit->value_.isNull()) {
+      return value_.isNull() && lit->value_.isNull();
+    }
+    return value_.fastEqualCheck(lit->value_);
+  }
+  return false;
+}
+
 void ScalarLiteral::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/expressions/ScalarLiteral.hpp b/query_optimizer/expressions/ScalarLiteral.hpp
index 8ebc41c..bff52bb 100644
--- a/query_optimizer/expressions/ScalarLiteral.hpp
+++ b/query_optimizer/expressions/ScalarLiteral.hpp
@@ -20,6 +20,7 @@
 #ifndef QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_SCALAR_LITERAL_HPP_
 #define QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_SCALAR_LITERAL_HPP_
 
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -81,6 +82,8 @@
   ::quickstep::Scalar* concretize(
       const std::unordered_map<ExprId, const CatalogAttribute*> &substitution_map) const override;
 
+  bool equals(const ScalarPtr &other) const override;
+
   /**
    * @brief Creates an immutable ScalarLiteral.
    * @param literal_value The literal value.
@@ -92,6 +95,8 @@
   }
 
  protected:
+  std::size_t computeHash() const override;
+
   void getFieldStringItems(
       std::vector<std::string> *inline_field_names,
       std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/expressions/SimpleCase.cpp b/query_optimizer/expressions/SimpleCase.cpp
index 454d7b9..ccdd8e5 100644
--- a/query_optimizer/expressions/SimpleCase.cpp
+++ b/query_optimizer/expressions/SimpleCase.cpp
@@ -31,12 +31,14 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ComparisonExpression.hpp"
 #include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/expressions/Scalar.hpp"
 #include "types/Type.hpp"
 #include "types/operations/comparisons/ComparisonID.hpp"
 #include "types/operations/comparisons/ComparisonFactory.hpp"
 #include "utility/Cast.hpp"
+#include "utility/HashPair.hpp"
 
 #include "glog/logging.h"
 
@@ -161,6 +163,50 @@
       else_result_expression.release());
 }
 
+std::size_t SimpleCase::computeHash() const {
+  std::size_t hash_code =
+      CombineHashes(static_cast<std::size_t>(ExpressionType::kSimpleCase),
+                    case_operand_->hash());
+  for (std::size_t i = 0; i < condition_operands_.size(); ++i) {
+    hash_code = CombineHashes(hash_code, condition_operands_[i]->hash());
+    hash_code = CombineHashes(hash_code, conditional_result_expressions_[i]->hash());
+  }
+  if (else_result_expression_ != nullptr) {
+    hash_code = CombineHashes(hash_code, else_result_expression_->hash());
+  }
+  return hash_code;
+}
+
+bool SimpleCase::equals(const ScalarPtr &other) const {
+  SimpleCasePtr expr;
+  if (!SomeSimpleCase::MatchesWithConditionalCast(other, &expr)) {
+    return false;
+  }
+  if (!case_operand_->equals(expr->case_operand_)) {
+    return false;
+  }
+  if (condition_operands_.size() != expr->condition_operands_.size()) {
+    return false;
+  }
+  for (std::size_t i = 0; i < condition_operands_.size(); ++i) {
+    if (!condition_operands_[i]->equals(expr->condition_operands_[i])
+        || !conditional_result_expressions_[i]->equals(
+                expr->conditional_result_expressions_[i])) {
+      return false;
+    }
+  }
+  if ((else_result_expression_ == nullptr
+       || expr->else_result_expression_ == nullptr)
+      && else_result_expression_ != expr->else_result_expression_) {
+    return false;
+  }
+  if (!else_result_expression_->equals(expr->else_result_expression_)) {
+    return false;
+  }
+  DCHECK(value_type_.equals(expr->value_type_));
+  return true;
+}
+
 void SimpleCase::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/expressions/SimpleCase.hpp b/query_optimizer/expressions/SimpleCase.hpp
index 897d87f..0820fa3 100644
--- a/query_optimizer/expressions/SimpleCase.hpp
+++ b/query_optimizer/expressions/SimpleCase.hpp
@@ -20,6 +20,7 @@
 #ifndef QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_SIMPLE_CASE_HPP_
 #define QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_SIMPLE_CASE_HPP_
 
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -110,6 +111,8 @@
   ::quickstep::Scalar* concretize(
       const std::unordered_map<ExprId, const CatalogAttribute*>& substitution_map) const override;
 
+  bool equals(const ScalarPtr &other) const override;
+
   /**
    * @brief Creates an immutable SimpleCase.
    *
@@ -136,6 +139,8 @@
   }
 
  protected:
+  std::size_t computeHash() const override;
+
   void getFieldStringItems(std::vector<std::string> *inline_field_names,
                            std::vector<std::string> *inline_field_values,
                            std::vector<std::string> *non_container_child_field_names,
diff --git a/query_optimizer/expressions/UnaryExpression.cpp b/query_optimizer/expressions/UnaryExpression.cpp
index b0fff62..b448553 100644
--- a/query_optimizer/expressions/UnaryExpression.cpp
+++ b/query_optimizer/expressions/UnaryExpression.cpp
@@ -19,6 +19,7 @@
 
 #include "query_optimizer/expressions/UnaryExpression.hpp"
 
+#include <cstddef>
 #include <string>
 #include <unordered_map>
 #include <vector>
@@ -31,6 +32,7 @@
 #include "query_optimizer/expressions/Scalar.hpp"
 #include "types/operations/unary_operations/UnaryOperation.hpp"
 #include "types/operations/unary_operations/UnaryOperationID.hpp"
+#include "utility/HashPair.hpp"
 
 #include "glog/logging.h"
 
@@ -56,6 +58,21 @@
       operation_, operand_->concretize(substitution_map));
 }
 
+std::size_t UnaryExpression::computeHash() const {
+  return CombineHashes(
+      CombineHashes(static_cast<std::size_t>(ExpressionType::kUnaryExpression),
+                    static_cast<std::size_t>(operation_.getUnaryOperationID())),
+      operand_->hash());
+}
+
+bool UnaryExpression::equals(const ScalarPtr &other) const {
+  UnaryExpressionPtr expr;
+  if (SomeUnaryExpression::MatchesWithConditionalCast(other, &expr)) {
+    return &operation_ == &expr->operation_ && operand_->equals(expr->operand_);
+  }
+  return false;
+}
+
 void UnaryExpression::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/expressions/UnaryExpression.hpp b/query_optimizer/expressions/UnaryExpression.hpp
index c4542d0..14201ff 100644
--- a/query_optimizer/expressions/UnaryExpression.hpp
+++ b/query_optimizer/expressions/UnaryExpression.hpp
@@ -20,6 +20,7 @@
 #ifndef QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_UNARY_EXPRESSION_HPP_
 #define QUICKSTEP_QUERY_OPTIMIZER_EXPRESSIONS_UNARY_EXPRESSION_HPP_
 
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <unordered_map>
@@ -85,6 +86,8 @@
   ::quickstep::Scalar* concretize(
       const std::unordered_map<ExprId, const CatalogAttribute*> &substitution_map) const override;
 
+  bool equals(const ScalarPtr &other) const override;
+
   /**
    * @brief Creates an immutable UnaryExpression.
    *
@@ -99,6 +102,8 @@
   }
 
  protected:
+  std::size_t computeHash() const override;
+
   void getFieldStringItems(
       std::vector<std::string> *inline_field_names,
       std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index 6847951..36e5959 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -21,6 +21,10 @@
 add_library(quickstep_queryoptimizer_rules_AttachLIPFilters AttachLIPFilters.cpp AttachLIPFilters.hpp)
 add_library(quickstep_queryoptimizer_rules_BottomUpRule ../../empty_src.cpp BottomUpRule.hpp)
 add_library(quickstep_queryoptimizer_rules_CollapseProject CollapseProject.cpp CollapseProject.hpp)
+add_library(quickstep_queryoptimizer_rules_CollapseSelection CollapseSelection.cpp CollapseSelection.hpp)
+add_library(quickstep_queryoptimizer_rules_ExtractCommonSubexpression
+            ExtractCommonSubexpression.cpp
+            ExtractCommonSubexpression.hpp)
 add_library(quickstep_queryoptimizer_rules_FuseAggregateJoin FuseAggregateJoin.cpp FuseAggregateJoin.hpp)
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
 add_library(quickstep_queryoptimizer_rules_InjectJoinFilters InjectJoinFilters.cpp InjectJoinFilters.hpp)
@@ -34,6 +38,9 @@
             ReduceGroupByAttributes.cpp
             ReduceGroupByAttributes.hpp)
 add_library(quickstep_queryoptimizer_rules_ReorderColumns ReorderColumns.cpp ReorderColumns.hpp)
+add_library(quickstep_queryoptimizer_rules_ReuseAggregateExpressions
+            ReuseAggregateExpressions.cpp
+            ReuseAggregateExpressions.hpp)
 add_library(quickstep_queryoptimizer_rules_Rule ../../empty_src.cpp Rule.hpp)
 add_library(quickstep_queryoptimizer_rules_RuleHelper RuleHelper.cpp RuleHelper.hpp)
 add_library(quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
@@ -77,6 +84,34 @@
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_queryoptimizer_rules_RuleHelper
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_CollapseSelection
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_rules_BottomUpRule
+                      quickstep_queryoptimizer_rules_RuleHelper
+                      quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_ExtractCommonSubexpression
+                      glog
+                      quickstep_queryoptimizer_OptimizerContext
+                      quickstep_queryoptimizer_expressions_AggregateFunction
+                      quickstep_queryoptimizer_expressions_Alias
+                      quickstep_queryoptimizer_expressions_CommonSubexpression
+                      quickstep_queryoptimizer_expressions_Expression
+                      quickstep_queryoptimizer_expressions_ExpressionType
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_expressions_Scalar
+                      quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_HashJoin
+                      quickstep_queryoptimizer_physical_NestedLoopsJoin
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_rules_Rule
+                      quickstep_utility_HashError
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
                       quickstep_queryoptimizer_expressions_AggregateFunction
@@ -200,6 +235,34 @@
                       quickstep_queryoptimizer_physical_TableReference
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_ReuseAggregateExpressions
+                      ${GFLAGS_LIB_NAME}
+                      glog
+                      quickstep_expressions_aggregation_AggregateFunction
+                      quickstep_expressions_aggregation_AggregateFunctionFactory
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_queryoptimizer_OptimizerContext
+                      quickstep_queryoptimizer_costmodel_StarSchemaSimpleCostModel
+                      quickstep_queryoptimizer_expressions_AggregateFunction
+                      quickstep_queryoptimizer_expressions_Alias
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_BinaryExpression
+                      quickstep_queryoptimizer_expressions_ExpressionType
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_Scalar
+                      quickstep_queryoptimizer_physical_Aggregate
+                      quickstep_queryoptimizer_physical_PatternMatcher
+                      quickstep_queryoptimizer_physical_Physical
+                      quickstep_queryoptimizer_physical_PhysicalType
+                      quickstep_queryoptimizer_physical_Selection
+                      quickstep_queryoptimizer_physical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_BottomUpRule
+                      quickstep_types_operations_binaryoperations_BinaryOperation
+                      quickstep_types_operations_binaryoperations_BinaryOperationFactory
+                      quickstep_types_operations_binaryoperations_BinaryOperationID
+                      quickstep_utility_HashError
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_Rule
                       glog
                       quickstep_utility_Macros)
@@ -311,6 +374,8 @@
                       quickstep_queryoptimizer_rules_AttachLIPFilters
                       quickstep_queryoptimizer_rules_BottomUpRule
                       quickstep_queryoptimizer_rules_CollapseProject
+                      quickstep_queryoptimizer_rules_CollapseSelection
+                      quickstep_queryoptimizer_rules_ExtractCommonSubexpression
                       quickstep_queryoptimizer_rules_FuseAggregateJoin
                       quickstep_queryoptimizer_rules_GenerateJoins
                       quickstep_queryoptimizer_rules_InjectJoinFilters
@@ -320,6 +385,7 @@
                       quickstep_queryoptimizer_rules_PushDownSemiAntiJoin
                       quickstep_queryoptimizer_rules_ReduceGroupByAttributes
                       quickstep_queryoptimizer_rules_ReorderColumns
+                      quickstep_queryoptimizer_rules_ReuseAggregateExpressions
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_queryoptimizer_rules_RuleHelper
                       quickstep_queryoptimizer_rules_StarSchemaHashJoinOrderOptimization
diff --git a/query_optimizer/rules/CollapseSelection.cpp b/query_optimizer/rules/CollapseSelection.cpp
new file mode 100644
index 0000000..e5427b4
--- /dev/null
+++ b/query_optimizer/rules/CollapseSelection.cpp
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/CollapseSelection.hpp"
+
+#include <vector>
+
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/rules/RuleHelper.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+P::PhysicalPtr CollapseSelection::applyToNode(const P::PhysicalPtr &input) {
+  P::SelectionPtr selection;
+  P::SelectionPtr child_selection;
+
+  // TODO(jianqiao): Handle the case where filter predicates are present.
+  if (P::SomeSelection::MatchesWithConditionalCast(input, &selection) &&
+      P::SomeSelection::MatchesWithConditionalCast(selection->input(), &child_selection) &&
+      selection->filter_predicate() == nullptr &&
+      child_selection->filter_predicate() == nullptr) {
+    std::vector<E::NamedExpressionPtr> project_expressions =
+        selection->project_expressions();
+    PullUpProjectExpressions(child_selection->project_expressions(),
+                             {} /* non_project_expression_lists */,
+                             {&project_expressions} /* project_expression_lists */);
+    return P::Selection::Create(child_selection->input(),
+                                project_expressions,
+                                selection->filter_predicate());
+  }
+
+  return input;
+}
+
+}  // namespace optimizer
+}  // namespace quickstep
diff --git a/query_optimizer/rules/CollapseSelection.hpp b/query_optimizer/rules/CollapseSelection.hpp
new file mode 100644
index 0000000..bc5e4a3
--- /dev/null
+++ b/query_optimizer/rules/CollapseSelection.hpp
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_COLLAPSE_SELECTION_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_COLLAPSE_SELECTION_HPP_
+
+#include <string>
+
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief Merges nested Selections into one single Selection.
+ */
+class CollapseSelection : public BottomUpRule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  CollapseSelection() {}
+
+  std::string getName() const override {
+    return "CollapseSelection";
+  }
+
+ protected:
+  physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &input) override;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(CollapseSelection);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_COLLAPSE_SELECTION_HPP_
diff --git a/query_optimizer/rules/ExtractCommonSubexpression.cpp b/query_optimizer/rules/ExtractCommonSubexpression.cpp
new file mode 100644
index 0000000..e3f996c
--- /dev/null
+++ b/query_optimizer/rules/ExtractCommonSubexpression.cpp
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/ExtractCommonSubexpression.hpp"
+
+#include <cstddef>
+#include <memory>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/CommonSubexpression.hpp"
+#include "query_optimizer/expressions/ExpressionType.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/HashJoin.hpp"
+#include "query_optimizer/physical/NestedLoopsJoin.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "utility/HashError.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+ExtractCommonSubexpression::ExtractCommonSubexpression(
+    OptimizerContext *optimizer_context)
+    : optimizer_context_(optimizer_context) {
+  const std::vector<E::ExpressionType> homogeneous_expr_types = {
+      E::ExpressionType::kAlias,
+      E::ExpressionType::kAttributeReference,
+      E::ExpressionType::kBinaryExpression,
+      E::ExpressionType::kCast,
+      E::ExpressionType::kCommonSubexpression,
+      E::ExpressionType::kScalarLiteral,
+      E::ExpressionType::kUnaryExpression
+  };
+
+  for (const auto &expr_type : homogeneous_expr_types) {
+    homogeneous_expression_types_.emplace(expr_type);
+  }
+}
+
+P::PhysicalPtr ExtractCommonSubexpression::apply(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  return applyInternal(input);
+}
+
+P::PhysicalPtr ExtractCommonSubexpression::applyInternal(
+    const P::PhysicalPtr &input) {
+  // First process all child nodes.
+  std::vector<P::PhysicalPtr> new_children;
+  for (const auto &child : input->children()) {
+    new_children.emplace_back(applyInternal(child));
+  }
+
+  const P::PhysicalPtr node =
+      new_children == input->children()
+          ? input
+          : input->copyWithNewChildren(new_children);
+
+  // Process expressions of the current node.
+  switch (node->getPhysicalType()) {
+    case P::PhysicalType::kAggregate: {
+      const P::AggregatePtr aggregate =
+          std::static_pointer_cast<const P::Aggregate>(node);
+
+      std::vector<E::ExpressionPtr> expressions;
+      // Gather grouping expressions and aggregate functions' argument expressions.
+      for (const auto &expr : aggregate->grouping_expressions()) {
+        expressions.emplace_back(expr);
+      }
+      for (const auto &expr : aggregate->aggregate_expressions()) {
+        const E::AggregateFunctionPtr &func =
+            std::static_pointer_cast<const E::AggregateFunction>(expr->expression());
+        for (const auto &arg : func->getArguments()) {
+          expressions.emplace_back(arg);
+        }
+      }
+
+      // Transform the expressions so that common subexpressions are labelled.
+      const std::vector<E::ExpressionPtr> new_expressions =
+          transformExpressions(expressions);
+
+      if (new_expressions != expressions) {
+        std::vector<E::AliasPtr> new_aggregate_expressions;
+        std::vector<E::NamedExpressionPtr> new_grouping_expressions;
+
+        // Reconstruct grouping expressions.
+        const std::size_t num_grouping_expressions =
+            aggregate->grouping_expressions().size();
+        for (std::size_t i = 0; i < num_grouping_expressions; ++i) {
+          DCHECK(E::SomeNamedExpression::Matches(new_expressions[i]));
+          new_grouping_expressions.emplace_back(
+              std::static_pointer_cast<const E::NamedExpression>(new_expressions[i]));
+        }
+
+        // Reconstruct aggregate expressions.
+        auto it = new_expressions.begin() + num_grouping_expressions;
+        for (const auto &expr : aggregate->aggregate_expressions()) {
+          const E::AggregateFunctionPtr &func =
+              std::static_pointer_cast<const E::AggregateFunction>(
+                  expr->expression());
+
+          std::vector<E::ScalarPtr> new_arguments;
+          for (std::size_t i = 0; i < func->getArguments().size(); ++i, ++it) {
+            DCHECK(E::SomeScalar::Matches(*it));
+            new_arguments.emplace_back(std::static_pointer_cast<const E::Scalar>(*it));
+          }
+
+          if (new_arguments == func->getArguments()) {
+            new_aggregate_expressions.emplace_back(expr);
+          } else {
+            const E::AggregateFunctionPtr new_func =
+                E::AggregateFunction::Create(func->getAggregate(),
+                                             new_arguments,
+                                             func->is_vector_aggregate(),
+                                             func->is_distinct());
+            new_aggregate_expressions.emplace_back(
+                E::Alias::Create(expr->id(),
+                                 new_func,
+                                 expr->attribute_name(),
+                                 expr->attribute_alias(),
+                                 expr->relation_name()));
+          }
+        }
+        return P::Aggregate::Create(aggregate->input(),
+                                    new_grouping_expressions,
+                                    new_aggregate_expressions,
+                                    aggregate->filter_predicate());
+      }
+      break;
+    }
+    case P::PhysicalType::kSelection: {
+      const P::SelectionPtr selection =
+          std::static_pointer_cast<const P::Selection>(node);
+
+      // Transform Selection's project expressions.
+      const std::vector<E::NamedExpressionPtr> new_expressions =
+          DownCast<E::NamedExpression>(
+              transformExpressions(UpCast(selection->project_expressions())));
+
+      if (new_expressions != selection->project_expressions()) {
+        return P::Selection::Create(selection->input(),
+                                    new_expressions,
+                                    selection->filter_predicate());
+      }
+      break;
+    }
+    case P::PhysicalType::kHashJoin: {
+      const P::HashJoinPtr hash_join =
+          std::static_pointer_cast<const P::HashJoin>(node);
+
+      // Transform HashJoin's project expressions.
+      const std::vector<E::NamedExpressionPtr> new_expressions =
+          DownCast<E::NamedExpression>(
+              transformExpressions(UpCast(hash_join->project_expressions())));
+
+      if (new_expressions != hash_join->project_expressions()) {
+        return P::HashJoin::Create(hash_join->left(),
+                                   hash_join->right(),
+                                   hash_join->left_join_attributes(),
+                                   hash_join->right_join_attributes(),
+                                   hash_join->residual_predicate(),
+                                   new_expressions,
+                                   hash_join->join_type());
+      }
+      break;
+    }
+    case P::PhysicalType::kNestedLoopsJoin: {
+      const P::NestedLoopsJoinPtr nested_loops_join =
+          std::static_pointer_cast<const P::NestedLoopsJoin>(node);
+
+      // Transform NestedLoopsJoin's project expressions.
+      const std::vector<E::NamedExpressionPtr> new_expressions =
+          DownCast<E::NamedExpression>(
+              transformExpressions(UpCast(nested_loops_join->project_expressions())));
+
+      if (new_expressions != nested_loops_join->project_expressions()) {
+        return P::NestedLoopsJoin::Create(nested_loops_join->left(),
+                                          nested_loops_join->right(),
+                                          nested_loops_join->join_predicate(),
+                                          new_expressions);
+      }
+      break;
+    }
+    default:
+      break;
+  }
+
+  return node;
+}
+
+std::vector<E::ExpressionPtr> ExtractCommonSubexpression::transformExpressions(
+    const std::vector<E::ExpressionPtr> &expressions) {
+  // Step 1. For each subexpression, count the number of its occurrences.
+  ScalarCounter counter;
+  ScalarHashable hashable;
+  for (const auto &expr : expressions) {
+    visitAndCount(expr, &counter, &hashable);
+  }
+
+  // Note that any subexpression with count > 1 is a common subexpression.
+  // However, it is not necessary to create a CommonSubexpression node for every
+  // such subexpression. E.g. consider the case
+  // --
+  //   SELECT (x+1)*(x+2), (x+1)*(x+2)*3 FROM s;
+  // --
+  // We only need to create one *dominant* CommonSubexpression (x+1)*(x+2) and
+  // do not need to create the child (x+1) or (x+2) nodes.
+  //
+  // To address this issue. We define that a subtree S *dominates* its descendent
+  // subtree (or leaf node) T if and only if counter[S] >= counter[T].
+  //
+  // Then we create CommonSubexpression nodes for every subexpression that is
+  // not dominated by any ancestor.
+
+  ScalarMap substitution_map;
+  std::vector<E::ExpressionPtr> new_expressions;
+  for (const auto &expr : expressions) {
+    new_expressions.emplace_back(
+        visitAndTransform(expr, 1, counter, hashable, &substitution_map));
+  }
+  return new_expressions;
+}
+
+E::ExpressionPtr ExtractCommonSubexpression::transformExpression(
+    const E::ExpressionPtr &expression) {
+  return transformExpressions({expression}).front();
+}
+
+bool ExtractCommonSubexpression::visitAndCount(
+    const E::ExpressionPtr &expression,
+    ScalarCounter *counter,
+    ScalarHashable *hashable) {
+  // This bool flag is for avoiding some unnecessary hash() computation.
+  bool children_hashable = true;
+
+  const auto homogeneous_expression_types_it =
+      homogeneous_expression_types_.find(expression->getExpressionType());
+  if (homogeneous_expression_types_it != homogeneous_expression_types_.end()) {
+    for (const auto &child : expression->children()) {
+      children_hashable &= visitAndCount(child, counter, hashable);
+    }
+  }
+
+  E::ScalarPtr scalar;
+  if (children_hashable &&
+      E::SomeScalar::MatchesWithConditionalCast(expression, &scalar)) {
+    // A scalar expression may or may not support the hash() computation.
+    // In the later case, a HashNotSupported exception will be thrown and we
+    // simply ignore this expression (and all its ancestor expressions).
+    try {
+      ++(*counter)[scalar];
+    } catch (const HashNotSupported &e) {
+      return false;
+    }
+    hashable->emplace(scalar);
+    return true;
+  }
+  return false;
+}
+
+E::ExpressionPtr ExtractCommonSubexpression::visitAndTransform(
+    const E::ExpressionPtr &expression,
+    const std::size_t max_reference_count,
+    const ScalarCounter &counter,
+    const ScalarHashable &hashable,
+    ScalarMap *substitution_map) {
+  // TODO(jianqiao): Currently it is hardly beneficial to make AttributeReference
+  // a common subexpression due to the inefficiency of ScalarAttribute's
+  // size-not-known-at-compile-time std::memcpy() calls, compared to copy-elision
+  // at selection level. Even in the case of compressed column store, it requires
+  // an attribute to occur at least 4 times for the common subexpression version
+  // to outperform the direct decoding version. We may look into ScalarAttribute
+  // and modify the heuristic here later.
+  if (expression->getExpressionType() == E::ExpressionType::kScalarLiteral ||
+      expression->getExpressionType() == E::ExpressionType::kAttributeReference) {
+    return expression;
+  }
+
+  E::ScalarPtr scalar;
+  const bool is_hashable =
+      E::SomeScalar::MatchesWithConditionalCast(expression, &scalar)
+          && hashable.find(scalar) != hashable.end();
+
+  std::size_t new_max_reference_count;
+  if (is_hashable) {
+    // CommonSubexpression node already generated somewhere. Just refer to that
+    // and return.
+    const auto substitution_map_it = substitution_map->find(scalar);
+    if (substitution_map_it != substitution_map->end()) {
+      return substitution_map_it->second;
+    }
+
+    // Otherwise, update the dominance count.
+    const auto counter_it = counter.find(scalar);
+    DCHECK(counter_it != counter.end());
+    DCHECK_LE(max_reference_count, counter_it->second);
+    new_max_reference_count = counter_it->second;
+  } else {
+    new_max_reference_count = max_reference_count;
+  }
+
+  // Process children.
+  std::vector<E::ExpressionPtr> new_children;
+  const auto homogeneous_expression_types_it =
+      homogeneous_expression_types_.find(expression->getExpressionType());
+  if (homogeneous_expression_types_it == homogeneous_expression_types_.end()) {
+    // If child subexpressions cannot be hoisted through the current expression,
+    // treat child expressions as isolated sub-optimizations.
+    for (const auto &child : expression->children()) {
+      new_children.emplace_back(transformExpression(child));
+    }
+  } else {
+    for (const auto &child : expression->children()) {
+      new_children.emplace_back(
+          visitAndTransform(child,
+                            new_max_reference_count,
+                            counter,
+                            hashable,
+                            substitution_map));
+    }
+  }
+
+  E::ExpressionPtr output;
+  if (new_children == expression->children()) {
+    output = expression;
+  } else {
+    output = std::static_pointer_cast<const E::Scalar>(
+        expression->copyWithNewChildren(new_children));
+  }
+
+  // Wrap it with a new CommonSubexpression node if the current expression is a
+  // dominant subexpression.
+  if (is_hashable && new_max_reference_count > max_reference_count) {
+    DCHECK(E::SomeScalar::Matches(output));
+    const E::CommonSubexpressionPtr common_subexpression =
+        E::CommonSubexpression::Create(
+            optimizer_context_->nextExprId(),
+            std::static_pointer_cast<const E::Scalar>(output));
+    substitution_map->emplace(scalar, common_subexpression);
+    output = common_subexpression;
+  }
+
+  return output;
+}
+
+}  // namespace optimizer
+}  // namespace quickstep
diff --git a/query_optimizer/rules/ExtractCommonSubexpression.hpp b/query_optimizer/rules/ExtractCommonSubexpression.hpp
new file mode 100644
index 0000000..3cdd70e
--- /dev/null
+++ b/query_optimizer/rules/ExtractCommonSubexpression.hpp
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_EXTRACT_COMMON_SUBEXPRESSION_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_EXTRACT_COMMON_SUBEXPRESSION_HPP_
+
+#include <cstddef>
+#include <functional>
+#include <memory>
+#include <string>
+#include <type_traits>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "query_optimizer/expressions/CommonSubexpression.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/ExpressionType.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+class OptimizerContext;
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief Rule that applies to a physical plan to identify and label common
+ *        subexpressions.
+ *
+ * @note This is essentially a logical optimization pass. However, we need some
+ *       of the physical passes (e.g. ReuseAggregateExpressions) to be finalized
+ *       before this one to maximize optimization opportunities.
+ */
+class ExtractCommonSubexpression : public Rule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param optimizer_context The optimizer context.
+   */
+  explicit ExtractCommonSubexpression(OptimizerContext *optimizer_context);
+
+  ~ExtractCommonSubexpression() override {}
+
+  std::string getName() const override {
+    return "ExtractCommonSubexpression";
+  }
+
+  physical::PhysicalPtr apply(const physical::PhysicalPtr &input) override;
+
+ private:
+  physical::PhysicalPtr applyInternal(const physical::PhysicalPtr &input);
+
+  struct ScalarHash {
+    inline std::size_t operator()(const expressions::ScalarPtr &scalar) const {
+      return scalar->hash();
+    }
+  };
+
+  struct ScalarEqual {
+    inline bool operator()(const expressions::ScalarPtr &lhs,
+                           const expressions::ScalarPtr &rhs) const {
+      return lhs->equals(rhs);
+    }
+  };
+
+  // For memorizing whether a subexpression is hashable.
+  using ScalarHashable = std::unordered_set<expressions::ScalarPtr>;
+
+  // For counting the number of occurrences of each subexpression.
+  using ScalarCounter =
+      std::unordered_map<expressions::ScalarPtr, std::size_t, ScalarHash, ScalarEqual>;
+
+  // For mapping each subexpression to its transformed version.
+  using ScalarMap =
+      std::unordered_map<expressions::ScalarPtr,
+                         expressions::CommonSubexpressionPtr,
+                         ScalarHash,
+                         ScalarEqual>;
+
+  std::vector<expressions::ExpressionPtr> transformExpressions(
+      const std::vector<expressions::ExpressionPtr> &expressions);
+
+  expressions::ExpressionPtr transformExpression(
+      const expressions::ExpressionPtr &expression);
+
+  // Traverse the expression tree and increase the count of each subexpression.
+  bool visitAndCount(
+      const expressions::ExpressionPtr &expression,
+      ScalarCounter *counter,
+      ScalarHashable *hashable);
+
+  // Traverse the expression tree and transform subexpressions (to common
+  // subexpressions) if applicable.
+  expressions::ExpressionPtr visitAndTransform(
+      const expressions::ExpressionPtr &expression,
+      const std::size_t max_reference_count,
+      const ScalarCounter &counter,
+      const ScalarHashable &hashable,
+      ScalarMap *substitution_map);
+
+  template <typename ScalarSubclassT>
+  static std::vector<expressions::ExpressionPtr> UpCast(
+      const std::vector<std::shared_ptr<const ScalarSubclassT>> &expressions) {
+    std::vector<expressions::ExpressionPtr> output;
+    for (const auto &expr : expressions) {
+      output.emplace_back(expr);
+    }
+    return output;
+  }
+
+  template <typename ScalarSubclassT>
+  static std::vector<std::shared_ptr<const ScalarSubclassT>> DownCast(
+      const std::vector<expressions::ExpressionPtr> &expressions) {
+    std::vector<std::shared_ptr<const ScalarSubclassT>> output;
+    for (const auto &expr : expressions) {
+      output.emplace_back(std::static_pointer_cast<const ScalarSubclassT>(expr));
+    }
+    return output;
+  }
+
+  struct ExpressionTypeHash {
+    using UnderT = std::underlying_type<expressions::ExpressionType>::type;
+
+    inline std::size_t operator()(const expressions::ExpressionType &et) const {
+      return std::hash<UnderT>()(static_cast<UnderT>(et));
+    }
+  };
+
+  // Here we define that an expression type is homogeneous if at execution time
+  // the result column vector of that expression has a one-to-one positional
+  // correspondence with all the result column vectors from its child expressions.
+  // E.g. aggregate functions and CASE expressions are not homogeneous.
+  //
+  // Being homogeneous enables common subexpressions to be hoisted through.
+  // For example, consider the case
+  // --
+  //   (x * 2) + F(x * 2)
+  // --
+  // where F is some unary expression. Then if F is homogenous, we can mark the
+  // two (x * 2) as common subexpressions. Otherwise, we cannot do that since
+  // the two subexpressions will generate different result column vectors.
+  std::unordered_set<expressions::ExpressionType,
+                     ExpressionTypeHash> homogeneous_expression_types_;
+
+  OptimizerContext *optimizer_context_;
+
+  DISALLOW_COPY_AND_ASSIGN(ExtractCommonSubexpression);
+};
+
+/** @} */
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_EXTRACT_COMMON_SUBEXPRESSION_HPP_
diff --git a/query_optimizer/rules/ReuseAggregateExpressions.cpp b/query_optimizer/rules/ReuseAggregateExpressions.cpp
new file mode 100644
index 0000000..79dede6
--- /dev/null
+++ b/query_optimizer/rules/ReuseAggregateExpressions.cpp
@@ -0,0 +1,349 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#include "query_optimizer/rules/ReuseAggregateExpressions.hpp"
+
+#include <cstddef>
+#include <list>
+#include <map>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include "expressions/aggregation/AggregateFunction.hpp"
+#include "expressions/aggregation/AggregateFunctionFactory.hpp"
+#include "expressions/aggregation/AggregationID.hpp"
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/expressions/AggregateFunction.hpp"
+#include "query_optimizer/expressions/Alias.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/BinaryExpression.hpp"
+#include "query_optimizer/expressions/ExpressionType.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Aggregate.hpp"
+#include "query_optimizer/physical/PatternMatcher.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/physical/PhysicalType.hpp"
+#include "query_optimizer/physical/Selection.hpp"
+#include "query_optimizer/physical/TopLevelPlan.hpp"
+#include "types/operations/binary_operations/BinaryOperation.hpp"
+#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
+#include "types/operations/binary_operations/BinaryOperationID.hpp"
+#include "utility/HashError.hpp"
+
+#include "gflags/gflags.h"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+DEFINE_uint64(reuse_aggregate_group_size_threshold, 1000u,
+              "The threshold on estimated number of groups for an aggregation "
+              "below which the ReuseAggregateExpressions optimization will be "
+              "performed.");
+
+DEFINE_double(reuse_aggregate_ratio_threshold, 0.3,
+              "The threshold on the ratio of (# of eliminable columns) to "
+              "(# of original columns) for an aggregation above which the "
+              "ReuseAggregateExpressions optimization will be performed.");
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace P = ::quickstep::optimizer::physical;
+
+void ReuseAggregateExpressions::init(const P::PhysicalPtr &input) {
+  DCHECK(input->getPhysicalType() == P::PhysicalType::kTopLevelPlan);
+
+  // Initialize cost model.
+  const P::TopLevelPlanPtr top_level_plan =
+     std::static_pointer_cast<const P::TopLevelPlan>(input);
+  cost_model_.reset(
+      new cost::StarSchemaSimpleCostModel(top_level_plan->shared_subplans()));
+}
+
+P::PhysicalPtr ReuseAggregateExpressions::applyToNode(
+    const P::PhysicalPtr &input) {
+  P::AggregatePtr aggregate;
+  if (!P::SomeAggregate::MatchesWithConditionalCast(input, &aggregate)) {
+    return input;
+  }
+
+  const std::vector<E::AliasPtr> &agg_exprs = aggregate->aggregate_expressions();
+
+  // Maps aggregated expressions to AggregationID + positions.
+  //
+  // For example,
+  // --
+  //   SELECT SUM(x+1), AVG(x+1), SUM(x+1), COUNT(*), SUM(y) FROM s;
+  // --
+  // will generate *agg_expr_info* as
+  // --
+  // {
+  //   x+1: { kSum: [0, 2], kAvg: [1] },
+  //   y: { kSum: [4] },
+  // }
+  // --
+  // and COUNT(*) will be recorded inside *count_star_info*.
+  std::unordered_map<E::ScalarPtr,
+                     std::map<AggregationID, std::vector<std::size_t>>,
+                     ScalarHash, ScalarEqual> agg_expr_info;
+  std::list<std::size_t> count_star_info;
+
+  for (std::size_t i = 0; i < agg_exprs.size(); ++i) {
+    DCHECK(agg_exprs[i]->expression()->getExpressionType()
+               == E::ExpressionType::kAggregateFunction);
+    const E::AggregateFunctionPtr agg_expr =
+        std::static_pointer_cast<const E::AggregateFunction>(
+            agg_exprs[i]->expression());
+
+    // Skip DISTINCT aggregations.
+    if (agg_expr->is_distinct()) {
+      continue;
+    }
+
+    const AggregationID agg_id = agg_expr->getAggregate().getAggregationID();
+    const std::vector<E::ScalarPtr> &arguments = agg_expr->getArguments();
+
+    // Currently we only consider aggregate functions with 0 or 1 argument.
+    if (agg_id == AggregationID::kCount) {
+      if (arguments.empty()) {
+        count_star_info.emplace_front(i);
+        continue;
+      } else if (!arguments.front()->getValueType().isNullable()) {
+        // For COUNT(x) where x is not null, we view it as a more general COUNT(*).
+        count_star_info.emplace_back(i);
+        continue;
+      }
+    }
+    if (arguments.size() == 1) {
+      try {
+        agg_expr_info[arguments.front()][agg_id].emplace_back(i);
+      } catch (const HashNotSupported &e) {
+        continue;
+      }
+    }
+  }
+
+  // Now for each aggregate expression, figure out whether we can avoid the
+  // computation by reusing other aggregate expression's result.
+  std::vector<std::unique_ptr<AggregateReference>> agg_refs(agg_exprs.size());
+
+  constexpr std::size_t kInvalidRef = static_cast<std::size_t>(-1);
+  std::size_t count_star_ref;
+
+  // Check whether COUNT(*) is available.
+  if (count_star_info.empty()) {
+    count_star_ref = kInvalidRef;
+  } else {
+    auto it = count_star_info.begin();
+    count_star_ref = *it;
+
+    for (++it; it != count_star_info.end(); ++it) {
+      agg_refs[*it].reset(new AggregateReference(count_star_ref));
+    }
+  }
+
+  // Iterate through agg_expr_info to find all transformation opportunities,
+  // and record them into agg_refs.
+  for (const auto &it : agg_expr_info) {
+    const auto &ref_map = it.second;
+
+    // First, check whether AVG can be reduced to SUM/COUNT.
+    bool is_avg_processed = false;
+
+    const auto avg_it = ref_map.find(AggregationID::kAvg);
+    if (avg_it != ref_map.end()) {
+      std::size_t count_ref = kInvalidRef;
+
+      // To reduce AVG to SUM/COUNT, we need a COUNT available.
+      // TODO(jianqiao): We may even add a new COUNT(*) aggregation if it is not
+      // there. E.g. when there are at least two AVG(...) aggregate functions.
+      if (it.first->getValueType().isNullable()) {
+        const auto count_it = ref_map.find(AggregationID::kCount);
+        if (count_it != ref_map.end()) {
+          DCHECK(!count_it->second.empty());
+          count_ref = count_it->second.front();
+        }
+      } else {
+        count_ref = count_star_ref;
+      }
+
+      if (count_ref != kInvalidRef) {
+        // It is done if there is an existing SUM. Otherwise we strength-reduce
+        // the current AVG to SUM.
+        const auto sum_it = ref_map.find(AggregationID::kSum);
+        const std::size_t sum_ref =
+            sum_it == ref_map.end() ? kInvalidRef : sum_it->second.front();
+
+        for (const std::size_t idx : avg_it->second) {
+          agg_refs[idx].reset(new AggregateReference(sum_ref, count_ref));
+        }
+        is_avg_processed = true;
+      }
+    }
+
+    // Then, identify duplicate aggregate expressions.
+    for (const auto &ref_it : ref_map) {
+      if (ref_it.first == AggregationID::kAvg && is_avg_processed) {
+        continue;
+      }
+      const auto &indices = ref_it.second;
+      DCHECK(!indices.empty());
+      const std::size_t ref = indices.front();
+      for (std::size_t i = 1; i < indices.size(); ++i) {
+        agg_refs[indices[i]].reset(new AggregateReference(ref));
+      }
+    }
+  }
+
+  // Count the number of eliminable aggregate expressions.
+  std::size_t num_eliminable = 0;
+  for (const auto &agg_ref : agg_refs) {
+    if (agg_ref != nullptr) {
+      ++num_eliminable;
+    }
+  }
+
+  if (num_eliminable == 0) {
+    return input;
+  }
+
+  // Now we need to make a decision since it is not always benefitial to perform
+  // the transformation. Currently we employ a simple heuristic that if either
+  // (1) The estimated number of groups for this Aggregate node is smaller than
+  //     the specified FLAGS_reuse_aggregate_group_size_threshold.
+  // or
+  // (2) The ratio of (# of eliminable columns) to (# of original columns) is
+  //     greater than the specified FLAGS_reuse_aggregate_ratio_threshold.
+  // then we perform the transformation.
+  const bool is_group_size_condition_satisfied =
+      cost_model_->estimateNumGroupsForAggregate(aggregate)
+          < FLAGS_reuse_aggregate_group_size_threshold;
+  const bool is_ratio_condition_satisfied =
+      static_cast<double>(num_eliminable) / agg_exprs.size()
+          > FLAGS_reuse_aggregate_ratio_threshold;
+
+  if (!is_group_size_condition_satisfied && !is_ratio_condition_satisfied) {
+    return input;
+  }
+
+  // Now we transform the original Aggregate to a reduced Aggregate + a wrapping
+  // Selection.
+
+  // Aggregate expressions for the new Aggregate.
+  std::vector<E::AliasPtr> new_agg_exprs;
+
+  // Project expressions for the new Selection.
+  std::vector<E::NamedExpressionPtr> new_select_exprs;
+
+  for (const auto &grouping_expr : aggregate->grouping_expressions()) {
+    new_select_exprs.emplace_back(E::ToRef(grouping_expr));
+  }
+
+  const std::vector<E::AttributeReferencePtr> agg_attrs = E::ToRefVector(agg_exprs);
+
+  for (std::size_t i = 0; i < agg_refs.size(); ++i) {
+    const auto &agg_ref = agg_refs[i];
+    const E::AliasPtr &agg_expr = agg_exprs[i];
+
+    if (agg_ref == nullptr) {
+      // Case 1: this aggregate expression can not be eliminated.
+      new_agg_exprs.emplace_back(agg_expr);
+      new_select_exprs.emplace_back(
+          E::AttributeReference::Create(agg_expr->id(),
+                                        agg_expr->attribute_name(),
+                                        agg_expr->attribute_alias(),
+                                        agg_expr->relation_name(),
+                                        agg_expr->getValueType(),
+                                        E::AttributeReferenceScope::kLocal));
+    } else {
+      switch (agg_ref->kind) {
+        // Case 2.1: this aggregate expression can be eliminated.
+        case AggregateReference::kDirect: {
+          new_select_exprs.emplace_back(
+              E::Alias::Create(agg_expr->id(),
+                               agg_attrs[agg_ref->first_ref],
+                               agg_expr->attribute_name(),
+                               agg_expr->attribute_alias(),
+                               agg_expr->relation_name()));
+          break;
+        }
+        // Case 2.2: this aggregate expression is an AVG.
+        case AggregateReference::kAvg: {
+          E::AttributeReferencePtr sum_attr;
+          if (agg_ref->first_ref == kInvalidRef) {
+            // Case 2.2.1: If there is no existing SUM, we need to convert this
+            //             AVG to SUM.
+            const E::AggregateFunctionPtr avg_expr =
+                std::static_pointer_cast<const E::AggregateFunction>(agg_expr->expression());
+
+            const AggregateFunction &sum_func =
+                AggregateFunctionFactory::Get(AggregationID::kSum);
+            const E::AggregateFunctionPtr sum_expr =
+                E::AggregateFunction::Create(sum_func,
+                                             avg_expr->getArguments(),
+                                             avg_expr->is_vector_aggregate(),
+                                             avg_expr->is_distinct());
+            new_agg_exprs.emplace_back(
+                E::Alias::Create(optimizer_context_->nextExprId(),
+                                 sum_expr,
+                                 agg_expr->attribute_name(),
+                                 agg_expr->attribute_alias(),
+                                 agg_expr->relation_name()));
+
+            sum_attr = E::ToRef(new_agg_exprs.back());
+          } else {
+            // Case 2.2.2: If there is a SUM somewhere, we just eliminate this
+            //             AVG and use the result from that SUM.
+            sum_attr = agg_attrs[agg_ref->first_ref];
+          }
+
+          // Obtain AVG by evaluating SUM/COUNT in Selection.
+          const BinaryOperation &divide_op =
+              BinaryOperationFactory::GetBinaryOperation(BinaryOperationID::kDivide);
+          const E::BinaryExpressionPtr avg_expr =
+              E::BinaryExpression::Create(divide_op,
+                                          sum_attr,
+                                          agg_attrs[agg_ref->second_ref]);
+          new_select_exprs.emplace_back(
+              E::Alias::Create(agg_expr->id(),
+                               avg_expr,
+                               agg_expr->attribute_name(),
+                               agg_expr->attribute_alias(),
+                               agg_expr->relation_name()));
+        }
+      }
+    }
+  }
+
+  // Construct the reduced Aggregate.
+  const P::AggregatePtr new_aggregate =
+      P::Aggregate::Create(aggregate->input(),
+                           aggregate->grouping_expressions(),
+                           new_agg_exprs,
+                           aggregate->filter_predicate());
+
+  // Construct the wrapping Selection.
+  return P::Selection::Create(new_aggregate, new_select_exprs, nullptr);
+}
+
+}  // namespace optimizer
+}  // namespace quickstep
diff --git a/query_optimizer/rules/ReuseAggregateExpressions.hpp b/query_optimizer/rules/ReuseAggregateExpressions.hpp
new file mode 100644
index 0000000..182e9d9
--- /dev/null
+++ b/query_optimizer/rules/ReuseAggregateExpressions.hpp
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_REUSE_AGGREGATE_EXPRESSIONS_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_REUSE_AGGREGATE_EXPRESSIONS_HPP_
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "query_optimizer/cost_model/StarSchemaSimpleCostModel.hpp"
+#include "query_optimizer/expressions/Scalar.hpp"
+#include "query_optimizer/physical/Physical.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+class OptimizerContext;
+
+/** \addtogroup OptimizerRules
+ *  @{
+ */
+
+/**
+ * @brief Rule that applies to a physical plan to eliminate duplicate aggregate
+ *        expressions and convert AVG to SUM/COUNT if appropriate.
+ *
+ * This rule rewrites Aggregate to Selection + Aggregate to eliminate duplicates
+ * and strength-reduce AVG functions. E.g.
+ * --
+ *   SELECT SUM(x), SUM(y), SUM(x) * 2, AVG(y), COUNT(*)
+ *   FROM s;
+ * --
+ * will be rewritten as (assume y is not null)
+ * --
+ *   SELECT sum_x, sum_y, sum_x * 2, sum_y / cnt, cnt
+ *   FROM (
+ *     SELECT SUM(x) AS sum_x, SUM(y) AS sum_y, COUNT(*) AS cnt
+ *   ) t;
+ * --
+ *
+ * Meanwhile, note that currently it is not free-of-cost to "re-project" the
+ * columns. So it may not worth doing the transformation in some situations.
+ * E.g. it may actually slow down the query to rewrite
+ * --
+ *   SELECT SUM(a), SUM(b), SUM(c), SUM(d), SUM(a)
+ *   FROM s
+ *   GROUP BY x;
+ * --
+ * as
+ * --
+ *   SELECT sum_a, sum_b, sum_c, sum_d, sum_a
+ *   FROM (
+ *     SELECT SUM(a) AS sum_a, SUM(b) AS sum_b, SUM(c) AS sum_c, SUM(d) AS sum_d
+ *     FROM s
+ *     GROUP BY x;
+ *   ) t;
+ * --
+ * if the number of distinct values of attribute x is large -- in this case, we
+ * avoid one duplicate computation of SUM(a), but introduce 5 extra expensive
+ * column copying with the outer Selection.
+ *
+ * To address the above problem, currently we employ a simple heuristic that if
+ * either
+ * (1) The estimated number of groups for this Aggregate node is smaller than
+ *     the specified FLAGS_reuse_aggregate_group_size_threshold.
+ * or
+ * (2) The ratio of (# of eliminable columns) to (# of original columns) is
+ *     greater than the specified FLAGS_reuse_aggregate_ratio_threshold.
+ * then we perform the transformation.
+ */
+class ReuseAggregateExpressions : public BottomUpRule<physical::Physical> {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param optimizer_context The optimizer context.
+   */
+  explicit ReuseAggregateExpressions(OptimizerContext *optimizer_context)
+      : optimizer_context_(optimizer_context) {}
+
+  std::string getName() const override {
+    return "ReuseAggregateExpressions";
+  }
+
+ protected:
+  void init(const physical::PhysicalPtr &input) override;
+
+  physical::PhysicalPtr applyToNode(const physical::PhysicalPtr &input) override;
+
+ private:
+  struct ScalarHash {
+    inline std::size_t operator()(const expressions::ScalarPtr &scalar) const {
+      return scalar->hash();
+    }
+  };
+
+  struct ScalarEqual {
+    inline bool operator()(const expressions::ScalarPtr &lhs,
+                           const expressions::ScalarPtr &rhs) const {
+      return lhs->equals(rhs);
+    }
+  };
+
+  // This data structure indicates for each aggregate expression whether the
+  // expression can be eliminated by refering to another identical expression,
+  // or can be strength-reduced from AVG to SUM.
+  struct AggregateReference {
+    enum Kind {
+      kDirect = 0,
+      kAvg
+    };
+
+    explicit AggregateReference(const std::size_t ref)
+        : kind(kDirect), first_ref(ref), second_ref(0) {}
+
+    AggregateReference(const std::size_t sum_ref, const std::size_t count_ref)
+        : kind(kAvg), first_ref(sum_ref), second_ref(count_ref) {}
+
+    const Kind kind;
+    const std::size_t first_ref;
+    const std::size_t second_ref;
+  };
+
+  OptimizerContext *optimizer_context_;
+
+  std::unique_ptr<cost::StarSchemaSimpleCostModel> cost_model_;
+
+  DISALLOW_COPY_AND_ASSIGN(ReuseAggregateExpressions);
+};
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_QUERY_OPTIMIZER_RULES_REUSE_AGGREGATE_EXPRESSIONS_HPP_
diff --git a/query_optimizer/tests/execution_generator/CMakeLists.txt b/query_optimizer/tests/execution_generator/CMakeLists.txt
index 40629ee..595d09d 100644
--- a/query_optimizer/tests/execution_generator/CMakeLists.txt
+++ b/query_optimizer/tests/execution_generator/CMakeLists.txt
@@ -15,6 +15,11 @@
 # specific language governing permissions and limitations
 # under the License.
 
+add_test(quickstep_queryoptimizer_tests_executiongenerator_commonsubexpression
+         "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/CommonSubexpression.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpression.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpression/")
 add_test(quickstep_queryoptimizer_tests_executiongenerator_create
          "../quickstep_queryoptimizer_tests_ExecutionGeneratorTest"
          "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
@@ -77,6 +82,11 @@
          "${CMAKE_CURRENT_BINARY_DIR}/Update/")
 
 if (ENABLE_DISTRIBUTED)
+  add_test(quickstep_queryoptimizer_tests_executiongenerator_commonsubexpression_distributed
+           "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
+           "${CMAKE_CURRENT_SOURCE_DIR}/CommonSubexpression.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpressionDistributed.test"
+           "${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpressionDistributed/")
   add_test(quickstep_queryoptimizer_tests_executiongenerator_create_distributed
            "../quickstep_queryoptimizer_tests_DistributedExecutionGeneratorTest"
            "${CMAKE_CURRENT_SOURCE_DIR}/Create.test"
@@ -141,6 +151,7 @@
 
 # Create the folders where the unit tests will store their data blocks for the
 # duration of their test.
+file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpression)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Create)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Delete)
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Distinct)
@@ -155,6 +166,7 @@
 file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/Update)
 
 if (ENABLE_DISTRIBUTED)
+  file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpressionDistributed)
   file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/CreateDistributed)
   file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DeleteDistributed)
   file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/DistinctDistributed)
diff --git a/query_optimizer/tests/execution_generator/CommonSubexpression.test b/query_optimizer/tests/execution_generator/CommonSubexpression.test
new file mode 100644
index 0000000..e0b5e2d
--- /dev/null
+++ b/query_optimizer/tests/execution_generator/CommonSubexpression.test
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+SELECT int_col+1, 1+int_col
+FROM test
+ORDER BY int_col
+LIMIT 5;
+--
++-----------+-----------+
+|(int_col+1)|(1+int_col)|
++-----------+-----------+
+|        -22|        -22|
+|        -20|        -20|
+|        -18|        -18|
+|        -16|        -16|
+|        -14|        -14|
++-----------+-----------+
+==
+
+SELECT SUM(int_col+long_col), SUM((long_col+int_col)*2)
+FROM test;
+--
++-----------------------+---------------------------+
+|SUM((int_col+long_col))|SUM(((long_col+int_col)*2))|
++-----------------------+---------------------------+
+|                   4382|                       8764|
++-----------------------+---------------------------+
+==
+
+SELECT SUM(long_col+1), SUM(long_col+1)*2, AVG(long_col+1), AVG(long_col+1)*2, COUNT(*)
+FROM test;
+--
++--------------------+---------------------+--------------------+---------------------+--------------------+
+|SUM((long_col+1))   |(SUM((long_col+1))*2)|AVG((long_col+1))   |(AVG((long_col+1))*2)|COUNT(*)            |
++--------------------+---------------------+--------------------+---------------------+--------------------+
+|                4925|                 9850|                 197|                  394|                  25|
++--------------------+---------------------+--------------------+---------------------+--------------------+
+==
diff --git a/query_optimizer/tests/physical_generator/CMakeLists.txt b/query_optimizer/tests/physical_generator/CMakeLists.txt
index c752cdd..fc3390e 100644
--- a/query_optimizer/tests/physical_generator/CMakeLists.txt
+++ b/query_optimizer/tests/physical_generator/CMakeLists.txt
@@ -15,6 +15,10 @@
 # specific language governing permissions and limitations
 # under the License.
 
+add_test(quickstep_queryoptimizer_tests_physicalgenerator_commonsubexpression
+         "../quickstep_queryoptimizer_tests_OptimizerTextTest"
+         "${CMAKE_CURRENT_SOURCE_DIR}/CommonSubexpression.test"
+         "${CMAKE_CURRENT_BINARY_DIR}/CommonSubexpression.test")
 add_test(quickstep_queryoptimizer_tests_physicalgenerator_copy
          "../quickstep_queryoptimizer_tests_OptimizerTextTest"
          "${CMAKE_CURRENT_SOURCE_DIR}/Copy.test"
diff --git a/query_optimizer/tests/physical_generator/CommonSubexpression.test b/query_optimizer/tests/physical_generator/CommonSubexpression.test
new file mode 100644
index 0000000..b23a97d
--- /dev/null
+++ b/query_optimizer/tests/physical_generator/CommonSubexpression.test
@@ -0,0 +1,772 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[default optimized_logical_plan physical_plan]
+
+SELECT int_col+1, 1+int_col
+FROM test;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=TableReference[relation_name=Test,relation_alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=(int_col+1),relation=,type=Int NULL]
+|   | +-Add
+|   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   +-Literal[value=1,type=Int]
+|   +-Alias[id=7,name=,alias=(1+int_col),relation=,type=Int NULL]
+|     +-Add
+|       +-Literal[value=1,type=Int]
+|       +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=(int_col+1),relation=,type=Int NULL]
+  +-AttributeReference[id=7,name=,alias=(1+int_col),relation=,type=Int NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=TableReference[relation=Test,alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-project_expressions=
+|   +-Alias[id=6,name=,alias=(int_col+1),relation=,type=Int NULL]
+|   | +-CommonSubexpression[common_subexpression_id=8]
+|   |   +-Operand=Add
+|   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |     +-Literal[value=1,type=Int]
+|   +-Alias[id=7,name=,alias=(1+int_col),relation=,type=Int NULL]
+|     +-CommonSubexpression[common_subexpression_id=8]
+|       +-Operand=Add
+|         +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|         +-Literal[value=1,type=Int]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=(int_col+1),relation=,type=Int NULL]
+  +-AttributeReference[id=7,name=,alias=(1+int_col),relation=,type=Int NULL]
+==
+
+# Aggregate
+SELECT SUM(int_col+float_col), MIN((float_col+int_col)*2)
+FROM test;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=Aggregate
+| | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Double NULL]
+| |   | +-AggregateFunction[function=SUM]
+| |   |   +-Add
+| |   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |   |     +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,type=Float NULL]
+| |     +-AggregateFunction[function=MIN]
+| |       +-Multiply
+| |         +-Add
+| |         | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |         | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |         +-Literal[value=2,type=Int]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=SUM((int_col+float_col)),relation=,type=Double NULL]
+|   | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   type=Double NULL]
+|   +-Alias[id=7,name=,alias=MIN(((float_col+int_col)*2)),relation=,
+|     type=Float NULL]
+|     +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+|       type=Float NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=SUM((int_col+float_col)),relation=,
+  | type=Double NULL]
+  +-AttributeReference[id=7,name=,alias=MIN(((float_col+int_col)*2)),relation=,
+    type=Float NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=Aggregate
+| | +-input=TableReference[relation=Test,alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Double NULL]
+| |   | +-AggregateFunction[function=SUM]
+| |   |   +-CommonSubexpression[common_subexpression_id=8]
+| |   |     +-Operand=Add
+| |   |       +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |   |       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,type=Float NULL]
+| |     +-AggregateFunction[function=MIN]
+| |       +-Multiply
+| |         +-CommonSubexpression[common_subexpression_id=8]
+| |         | +-Operand=Add
+| |         |   +-AttributeReference[id=0,name=int_col,relation=test,
+| |         |   | type=Int NULL]
+| |         |   +-AttributeReference[id=2,name=float_col,relation=test,
+| |         |     type=Float]
+| |         +-Literal[value=2,type=Int]
+| +-project_expressions=
+|   +-Alias[id=6,name=,alias=SUM((int_col+float_col)),relation=,type=Double NULL]
+|   | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   type=Double NULL]
+|   +-Alias[id=7,name=,alias=MIN(((float_col+int_col)*2)),relation=,
+|     type=Float NULL]
+|     +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+|       type=Float NULL]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=SUM((int_col+float_col)),relation=,
+  | type=Double NULL]
+  +-AttributeReference[id=7,name=,alias=MIN(((float_col+int_col)*2)),relation=,
+    type=Float NULL]
+==
+
+# HashJoin
+SELECT int_col + j, (int_col + j) * float_col
+FROM test, (SELECT i, i * i AS j FROM generate_series(1, 10) AS g(i)) t
+WHERE int_col = i AND (int_col + i) < float_col;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=Filter
+| | +-input=HashJoin
+| | | +-left=TableReference[relation_name=Test,relation_alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | |   type=VarChar(20) NULL]
+| | | +-right=Project
+| | | | +-input=TableGenerator[function_name=generate_series,table_alias=g]
+| | | | | +-AttributeReference[id=6,name=generate_series,alias=g,
+| | | | |   relation=generate_series,type=Int]
+| | | | +-project_list=
+| | | |   +-Alias[id=7,name=i,relation=,type=Int]
+| | | |   | +-AttributeReference[id=6,name=generate_series,alias=g,
+| | | |   |   relation=generate_series,type=Int]
+| | | |   +-Alias[id=8,name=j,relation=t,type=Int]
+| | | |     +-Multiply
+| | | |       +-AttributeReference[id=6,name=generate_series,alias=g,
+| | | |       | relation=generate_series,type=Int]
+| | | |       +-AttributeReference[id=6,name=generate_series,alias=g,
+| | | |         relation=generate_series,type=Int]
+| | | +-left_join_attributes=
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-right_join_attributes=
+| | |   +-AttributeReference[id=7,name=i,relation=,type=Int]
+| | +-filter_predicate=Less
+| |   +-Add
+| |   | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |   | +-AttributeReference[id=7,name=i,relation=,type=Int]
+| |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| +-project_list=
+|   +-Alias[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+|   | +-Add
+|   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   +-AttributeReference[id=8,name=j,relation=t,type=Int]
+|   +-Alias[id=10,name=,alias=((int_col+j)*float_col),relation=,type=Float NULL]
+|     +-Multiply
+|       +-Add
+|       | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       | +-AttributeReference[id=8,name=j,relation=t,type=Int]
+|       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
++-output_attributes=
+  +-AttributeReference[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+  +-AttributeReference[id=10,name=,alias=((int_col+j)*float_col),relation=,
+    type=Float NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=HashJoin
+| +-left=TableGenerator[function_name=generate_series,table_alias=g]
+| | +-AttributeReference[id=6,name=generate_series,alias=g,
+| |   relation=generate_series,type=Int]
+| +-right=TableReference[relation=Test,alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-residual_predicate=Less
+| | +-Add
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |   relation=generate_series,type=Int]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| +-project_expressions=
+| | +-Alias[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+| | | +-CommonSubexpression[common_subexpression_id=11]
+| | |   +-Operand=Add
+| | |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | |     +-Multiply
+| | |       +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |       | relation=generate_series,type=Int]
+| | |       +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |         relation=generate_series,type=Int]
+| | +-Alias[id=10,name=,alias=((int_col+j)*float_col),relation=,type=Float NULL]
+| |   +-Multiply
+| |     +-CommonSubexpression[common_subexpression_id=11]
+| |     | +-Operand=Add
+| |     |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |     |   +-Multiply
+| |     |     +-AttributeReference[id=6,name=generate_series,alias=g,
+| |     |     | relation=generate_series,type=Int]
+| |     |     +-AttributeReference[id=6,name=generate_series,alias=g,
+| |     |       relation=generate_series,type=Int]
+| |     +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| +-left_join_attributes=
+| | +-AttributeReference[id=6,name=generate_series,alias=g,
+| |   relation=generate_series,type=Int]
+| +-right_join_attributes=
+|   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
++-output_attributes=
+  +-AttributeReference[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+  +-AttributeReference[id=10,name=,alias=((int_col+j)*float_col),relation=,
+    type=Float NULL]
+==
+
+# NestedLoopsJoin
+SELECT int_col + j, (int_col + j) * float_col
+FROM test, (SELECT i, i * i AS j FROM generate_series(1, 10) AS g(i)) t
+WHERE (int_col + i) < float_col;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=NestedLoopsJoin
+| | +-left=TableReference[relation_name=Test,relation_alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-right=Project
+| | | +-input=TableGenerator[function_name=generate_series,table_alias=g]
+| | | | +-AttributeReference[id=6,name=generate_series,alias=g,
+| | | |   relation=generate_series,type=Int]
+| | | +-project_list=
+| | |   +-Alias[id=7,name=i,relation=,type=Int]
+| | |   | +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |   |   relation=generate_series,type=Int]
+| | |   +-Alias[id=8,name=j,relation=t,type=Int]
+| | |     +-Multiply
+| | |       +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |       | relation=generate_series,type=Int]
+| | |       +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |         relation=generate_series,type=Int]
+| | +-join_predicate=Less
+| |   +-Add
+| |   | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| |   | +-AttributeReference[id=7,name=i,relation=,type=Int]
+| |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| +-project_list=
+|   +-Alias[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+|   | +-Add
+|   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   +-AttributeReference[id=8,name=j,relation=t,type=Int]
+|   +-Alias[id=10,name=,alias=((int_col+j)*float_col),relation=,type=Float NULL]
+|     +-Multiply
+|       +-Add
+|       | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       | +-AttributeReference[id=8,name=j,relation=t,type=Int]
+|       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
++-output_attributes=
+  +-AttributeReference[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+  +-AttributeReference[id=10,name=,alias=((int_col+j)*float_col),relation=,
+    type=Float NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=NestedLoopsJoin
+| +-left=TableReference[relation=Test,alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-right=TableGenerator[function_name=generate_series,table_alias=g]
+| | +-AttributeReference[id=6,name=generate_series,alias=g,
+| |   relation=generate_series,type=Int]
+| +-join_predicate=Less
+| | +-Add
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=6,name=generate_series,alias=g,
+| | |   relation=generate_series,type=Int]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| +-project_expressions=
+|   +-Alias[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+|   | +-CommonSubexpression[common_subexpression_id=11]
+|   |   +-Operand=Add
+|   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |     +-Multiply
+|   |       +-AttributeReference[id=6,name=generate_series,alias=g,
+|   |       | relation=generate_series,type=Int]
+|   |       +-AttributeReference[id=6,name=generate_series,alias=g,
+|   |         relation=generate_series,type=Int]
+|   +-Alias[id=10,name=,alias=((int_col+j)*float_col),relation=,type=Float NULL]
+|     +-Multiply
+|       +-CommonSubexpression[common_subexpression_id=11]
+|       | +-Operand=Add
+|       |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       |   +-Multiply
+|       |     +-AttributeReference[id=6,name=generate_series,alias=g,
+|       |     | relation=generate_series,type=Int]
+|       |     +-AttributeReference[id=6,name=generate_series,alias=g,
+|       |       relation=generate_series,type=Int]
+|       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
++-output_attributes=
+  +-AttributeReference[id=9,name=,alias=(int_col+j),relation=,type=Int NULL]
+  +-AttributeReference[id=10,name=,alias=((int_col+j)*float_col),relation=,
+    type=Float NULL]
+==
+
+# Case expression
+SELECT long_col+1,
+       CASE WHEN int_col = 1 THEN (long_col+1)*(long_col+1)
+            WHEN int_col = 2 THEN (long_col+1)*(long_col+1)
+            ELSE long_col+1 END AS result
+FROM test;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=TableReference[relation_name=Test,relation_alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=(long_col+1),relation=,type=Long]
+|   | +-Add
+|   |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|   |   +-Literal[value=1,type=Int]
+|   +-Alias[id=7,name=result,relation=,type=Long]
+|     +-SearchedCase
+|       +-else_result_expression=Add
+|       | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|       | +-Literal[value=1,type=Long]
+|       +-condition_perdicates=
+|       | +-Equal
+|       | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       | | +-Literal[value=1,type=Int]
+|       | +-Equal
+|       |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       |   +-Literal[value=2,type=Int]
+|       +-conditional_result_expressions=
+|         +-Multiply
+|         | +-Add
+|         | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|         | | +-Literal[value=1,type=Int]
+|         | +-Add
+|         |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|         |   +-Literal[value=1,type=Int]
+|         +-Multiply
+|           +-Add
+|           | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|           | +-Literal[value=1,type=Int]
+|           +-Add
+|             +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|             +-Literal[value=1,type=Int]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=(long_col+1),relation=,type=Long]
+  +-AttributeReference[id=7,name=result,relation=,type=Long]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=TableReference[relation=Test,alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-project_expressions=
+|   +-Alias[id=6,name=,alias=(long_col+1),relation=,type=Long]
+|   | +-Add
+|   |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|   |   +-Literal[value=1,type=Int]
+|   +-Alias[id=7,name=result,relation=,type=Long]
+|     +-SearchedCase
+|       +-else_result_expression=Add
+|       | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|       | +-Literal[value=1,type=Long]
+|       +-condition_perdicates=
+|       | +-Equal
+|       | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       | | +-Literal[value=1,type=Int]
+|       | +-Equal
+|       |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       |   +-Literal[value=2,type=Int]
+|       +-conditional_result_expressions=
+|         +-Multiply
+|         | +-CommonSubexpression[common_subexpression_id=8]
+|         | | +-Operand=Add
+|         | |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|         | |   +-Literal[value=1,type=Int]
+|         | +-CommonSubexpression[common_subexpression_id=8]
+|         |   +-Operand=Add
+|         |     +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|         |     +-Literal[value=1,type=Int]
+|         +-Multiply
+|           +-CommonSubexpression[common_subexpression_id=9]
+|           | +-Operand=Add
+|           |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|           |   +-Literal[value=1,type=Int]
+|           +-CommonSubexpression[common_subexpression_id=9]
+|             +-Operand=Add
+|               +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+|               +-Literal[value=1,type=Int]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=(long_col+1),relation=,type=Long]
+  +-AttributeReference[id=7,name=result,relation=,type=Long]
+==
+
+SELECT (int_col+1)*(int_col+2)*(int_col+3), (int_col+1)*(int_col+2), int_col+1
+FROM test;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=TableReference[relation_name=Test,relation_alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=(((int_col+1)*(int_col+2))*(int_col+3)),relation=,
+|   | type=Int NULL]
+|   | +-Multiply
+|   |   +-Multiply
+|   |   | +-Add
+|   |   | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   | | +-Literal[value=1,type=Int]
+|   |   | +-Add
+|   |   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   |   +-Literal[value=2,type=Int]
+|   |   +-Add
+|   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |     +-Literal[value=3,type=Int]
+|   +-Alias[id=7,name=,alias=((int_col+1)*(int_col+2)),relation=,type=Int NULL]
+|   | +-Multiply
+|   |   +-Add
+|   |   | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   | +-Literal[value=1,type=Int]
+|   |   +-Add
+|   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |     +-Literal[value=2,type=Int]
+|   +-Alias[id=8,name=,alias=(int_col+1),relation=,type=Int NULL]
+|     +-Add
+|       +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|       +-Literal[value=1,type=Int]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=(((int_col+1)*(int_col+2))*(int_col+3)),
+  | relation=,type=Int NULL]
+  +-AttributeReference[id=7,name=,alias=((int_col+1)*(int_col+2)),relation=,
+  | type=Int NULL]
+  +-AttributeReference[id=8,name=,alias=(int_col+1),relation=,type=Int NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=TableReference[relation=Test,alias=test]
+| | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | +-AttributeReference[id=5,name=vchar_col,relation=test,type=VarChar(20) NULL]
+| +-project_expressions=
+|   +-Alias[id=6,name=,alias=(((int_col+1)*(int_col+2))*(int_col+3)),relation=,
+|   | type=Int NULL]
+|   | +-Multiply
+|   |   +-CommonSubexpression[common_subexpression_id=10]
+|   |   | +-Operand=Multiply
+|   |   |   +-CommonSubexpression[common_subexpression_id=9]
+|   |   |   | +-Operand=Add
+|   |   |   |   +-AttributeReference[id=0,name=int_col,relation=test,
+|   |   |   |   | type=Int NULL]
+|   |   |   |   +-Literal[value=1,type=Int]
+|   |   |   +-Add
+|   |   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |   |     +-Literal[value=2,type=Int]
+|   |   +-Add
+|   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |     +-Literal[value=3,type=Int]
+|   +-Alias[id=7,name=,alias=((int_col+1)*(int_col+2)),relation=,type=Int NULL]
+|   | +-CommonSubexpression[common_subexpression_id=10]
+|   |   +-Operand=Multiply
+|   |     +-CommonSubexpression[common_subexpression_id=9]
+|   |     | +-Operand=Add
+|   |     |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |     |   +-Literal[value=1,type=Int]
+|   |     +-Add
+|   |       +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|   |       +-Literal[value=2,type=Int]
+|   +-Alias[id=8,name=,alias=(int_col+1),relation=,type=Int NULL]
+|     +-CommonSubexpression[common_subexpression_id=9]
+|       +-Operand=Add
+|         +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+|         +-Literal[value=1,type=Int]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=(((int_col+1)*(int_col+2))*(int_col+3)),
+  | relation=,type=Int NULL]
+  +-AttributeReference[id=7,name=,alias=((int_col+1)*(int_col+2)),relation=,
+  | type=Int NULL]
+  +-AttributeReference[id=8,name=,alias=(int_col+1),relation=,type=Int NULL]
+==
+
+# Reuse aggregate expressions
+SELECT SUM(long_col+1), AVG(1+long_col), MIN(float_col), MIN(float_col)*2, COUNT(*)
+FROM test;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=Aggregate
+| | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long NULL]
+| |   | +-AggregateFunction[function=SUM]
+| |   |   +-Add
+| |   |     +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |   |     +-Literal[value=1,type=Int]
+| |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,type=Double NULL]
+| |   | +-AggregateFunction[function=AVG]
+| |   |   +-Add
+| |   |     +-Literal[value=1,type=Int]
+| |   |     +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |   +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,type=Float NULL]
+| |   | +-AggregateFunction[function=MIN]
+| |   |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |   +-Alias[id=9,name=,alias=$aggregate3,relation=$aggregate,type=Float NULL]
+| |   | +-AggregateFunction[function=MIN]
+| |   |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |   +-Alias[id=11,name=,alias=$aggregate4,relation=$aggregate,type=Long]
+| |     +-AggregateFunction[function=COUNT]
+| |       +-[]
+| +-project_list=
+|   +-Alias[id=6,name=,alias=SUM((long_col+1)),relation=,type=Long NULL]
+|   | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   type=Long NULL]
+|   +-Alias[id=7,name=,alias=AVG((1+long_col)),relation=,type=Double NULL]
+|   | +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+|   |   type=Double NULL]
+|   +-Alias[id=8,name=,alias=MIN(float_col),relation=,type=Float NULL]
+|   | +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+|   |   type=Float NULL]
+|   +-Alias[id=10,name=,alias=(MIN(float_col)*2),relation=,type=Float NULL]
+|   | +-Multiply
+|   |   +-AttributeReference[id=9,name=,alias=$aggregate3,relation=$aggregate,
+|   |   | type=Float NULL]
+|   |   +-Literal[value=2,type=Int]
+|   +-Alias[id=11,name=,alias=COUNT(*),relation=,type=Long]
+|     +-AttributeReference[id=11,name=,alias=$aggregate4,relation=$aggregate,
+|       type=Long]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=SUM((long_col+1)),relation=,
+  | type=Long NULL]
+  +-AttributeReference[id=7,name=,alias=AVG((1+long_col)),relation=,
+  | type=Double NULL]
+  +-AttributeReference[id=8,name=,alias=MIN(float_col),relation=,type=Float NULL]
+  +-AttributeReference[id=10,name=,alias=(MIN(float_col)*2),relation=,
+  | type=Float NULL]
+  +-AttributeReference[id=11,name=,alias=COUNT(*),relation=,type=Long]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=Aggregate
+| | +-input=TableReference[relation=Test,alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long NULL]
+| |   | +-AggregateFunction[function=SUM]
+| |   |   +-Add
+| |   |     +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |   |     +-Literal[value=1,type=Int]
+| |   +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,type=Float NULL]
+| |   | +-AggregateFunction[function=MIN]
+| |   |   +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |   +-Alias[id=11,name=,alias=$aggregate4,relation=$aggregate,type=Long]
+| |     +-AggregateFunction[function=COUNT]
+| |       +-[]
+| +-project_expressions=
+|   +-Alias[id=6,name=,alias=SUM((long_col+1)),relation=,type=Long NULL]
+|   | +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   type=Long NULL]
+|   +-Alias[id=7,name=,alias=AVG((1+long_col)),relation=,type=Long NULL]
+|   | +-Divide
+|   |   +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   | type=Long NULL]
+|   |   +-AttributeReference[id=11,name=,alias=$aggregate4,relation=$aggregate,
+|   |     type=Long]
+|   +-Alias[id=8,name=,alias=MIN(float_col),relation=,type=Float NULL]
+|   | +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+|   |   type=Float NULL]
+|   +-Alias[id=10,name=,alias=(MIN(float_col)*2),relation=,type=Float NULL]
+|   | +-Multiply
+|   |   +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+|   |   | type=Float NULL]
+|   |   +-Literal[value=2,type=Int]
+|   +-Alias[id=11,name=,alias=COUNT(*),relation=,type=Long]
+|     +-AttributeReference[id=11,name=,alias=$aggregate4,relation=$aggregate,
+|       type=Long]
++-output_attributes=
+  +-AttributeReference[id=6,name=,alias=SUM((long_col+1)),relation=,
+  | type=Long NULL]
+  +-AttributeReference[id=7,name=,alias=AVG((1+long_col)),relation=,
+  | type=Long NULL]
+  +-AttributeReference[id=8,name=,alias=MIN(float_col),relation=,type=Float NULL]
+  +-AttributeReference[id=10,name=,alias=(MIN(float_col)*2),relation=,
+  | type=Float NULL]
+  +-AttributeReference[id=11,name=,alias=COUNT(*),relation=,type=Long]
+==
+
+SELECT SUM(long_col+1)+2, (SUM(long_col+1)+2)*3
+FROM test;
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=Aggregate
+| | +-input=TableReference[relation_name=Test,relation_alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long NULL]
+| |   | +-AggregateFunction[function=SUM]
+| |   |   +-Add
+| |   |     +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |   |     +-Literal[value=1,type=Int]
+| |   +-Alias[id=8,name=,alias=$aggregate1,relation=$aggregate,type=Long NULL]
+| |     +-AggregateFunction[function=SUM]
+| |       +-Add
+| |         +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |         +-Literal[value=1,type=Int]
+| +-project_list=
+|   +-Alias[id=7,name=,alias=(SUM((long_col+1))+2),relation=,type=Long NULL]
+|   | +-Add
+|   |   +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |   | type=Long NULL]
+|   |   +-Literal[value=2,type=Int]
+|   +-Alias[id=9,name=,alias=((SUM((long_col+1))+2)*3),relation=,type=Long NULL]
+|     +-Multiply
+|       +-Add
+|       | +-AttributeReference[id=8,name=,alias=$aggregate1,relation=$aggregate,
+|       | | type=Long NULL]
+|       | +-Literal[value=2,type=Int]
+|       +-Literal[value=3,type=Int]
++-output_attributes=
+  +-AttributeReference[id=7,name=,alias=(SUM((long_col+1))+2),relation=,
+  | type=Long NULL]
+  +-AttributeReference[id=9,name=,alias=((SUM((long_col+1))+2)*3),relation=,
+    type=Long NULL]
+[Physical Plan]
+TopLevelPlan
++-plan=Selection
+| +-input=Aggregate
+| | +-input=TableReference[relation=Test,alias=test]
+| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | |   type=VarChar(20) NULL]
+| | +-grouping_expressions=
+| | | +-[]
+| | +-aggregate_expressions=
+| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long NULL]
+| |     +-AggregateFunction[function=SUM]
+| |       +-Add
+| |         +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |         +-Literal[value=1,type=Int]
+| +-project_expressions=
+|   +-Alias[id=7,name=,alias=(SUM((long_col+1))+2),relation=,type=Long NULL]
+|   | +-CommonSubexpression[common_subexpression_id=10]
+|   |   +-Operand=Add
+|   |     +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+|   |     | type=Long NULL]
+|   |     +-Literal[value=2,type=Int]
+|   +-Alias[id=9,name=,alias=((SUM((long_col+1))+2)*3),relation=,type=Long NULL]
+|     +-Multiply
+|       +-CommonSubexpression[common_subexpression_id=10]
+|       | +-Operand=Add
+|       |   +-AttributeReference[id=6,name=,alias=$aggregate0,
+|       |   | relation=$aggregate,type=Long NULL]
+|       |   +-Literal[value=2,type=Int]
+|       +-Literal[value=3,type=Int]
++-output_attributes=
+  +-AttributeReference[id=7,name=,alias=(SUM((long_col+1))+2),relation=,
+  | type=Long NULL]
+  +-AttributeReference[id=9,name=,alias=((SUM((long_col+1))+2)*3),relation=,
+    type=Long NULL]
+==
diff --git a/query_optimizer/tests/physical_generator/Select.test b/query_optimizer/tests/physical_generator/Select.test
index f7de922..614347b 100644
--- a/query_optimizer/tests/physical_generator/Select.test
+++ b/query_optimizer/tests/physical_generator/Select.test
@@ -1022,34 +1022,51 @@
 [Physical Plan]
 TopLevelPlan
 +-plan=Selection
-| +-input=Aggregate
-| | +-input=TableReference[relation=Test,alias=test]
-| | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
-| | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
-| | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
-| | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
-| | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
-| | | +-AttributeReference[id=5,name=vchar_col,relation=test,
-| | |   type=VarChar(20) NULL]
-| | +-grouping_expressions=
-| | | +-[]
-| | +-aggregate_expressions=
-| |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long]
-| |   | +-AggregateFunction[function=COUNT]
-| |   |   +-[]
-| |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,type=Long]
-| |   | +-AggregateFunction[function=COUNT]
-| |   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
-| |   +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,type=Long NULL]
-| |   | +-AggregateFunction[function=SUM]
-| |   |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
-| |   +-Alias[id=9,name=,alias=$aggregate3,relation=$aggregate,type=Double NULL]
-| |   | +-AggregateFunction[function=AVG]
-| |   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
-| |   +-Alias[id=11,name=,alias=$aggregate4,relation=$aggregate,type=Double NULL]
-| |     +-AggregateFunction[function=MAX]
-| |       +-AttributeReference[id=3,name=double_col,relation=test,
-| |         type=Double NULL]
+| +-input=Selection
+| | +-input=Aggregate
+| | | +-input=TableReference[relation=Test,alias=test]
+| | | | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | | | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | | | +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| | | | +-AttributeReference[id=3,name=double_col,relation=test,type=Double NULL]
+| | | | +-AttributeReference[id=4,name=char_col,relation=test,type=Char(20)]
+| | | | +-AttributeReference[id=5,name=vchar_col,relation=test,
+| | | |   type=VarChar(20) NULL]
+| | | +-grouping_expressions=
+| | | | +-[]
+| | | +-aggregate_expressions=
+| | |   +-Alias[id=6,name=,alias=$aggregate0,relation=$aggregate,type=Long]
+| | |   | +-AggregateFunction[function=COUNT]
+| | |   |   +-[]
+| | |   +-Alias[id=7,name=,alias=$aggregate1,relation=$aggregate,type=Long]
+| | |   | +-AggregateFunction[function=COUNT]
+| | |   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | |   +-Alias[id=8,name=,alias=$aggregate2,relation=$aggregate,type=Long NULL]
+| | |   | +-AggregateFunction[function=SUM]
+| | |   |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| | |   +-Alias[id=12,name=,alias=$aggregate3,relation=$aggregate,type=Long NULL]
+| | |   | +-AggregateFunction[function=SUM]
+| | |   |   +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
+| | |   +-Alias[id=11,name=,alias=$aggregate4,relation=$aggregate,
+| | |     type=Double NULL]
+| | |     +-AggregateFunction[function=MAX]
+| | |       +-AttributeReference[id=3,name=double_col,relation=test,
+| | |         type=Double NULL]
+| | +-project_expressions=
+| |   +-AttributeReference[id=6,name=,alias=$aggregate0,relation=$aggregate,
+| |   | type=Long]
+| |   +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+| |   | type=Long]
+| |   +-AttributeReference[id=8,name=,alias=$aggregate2,relation=$aggregate,
+| |   | type=Long NULL]
+| |   +-Alias[id=9,name=,alias=$aggregate3,relation=$aggregate,type=Long NULL]
+| |   | +-Divide
+| |   |   +-AttributeReference[id=12,name=,alias=$aggregate3,
+| |   |   | relation=$aggregate,type=Long NULL]
+| |   |   +-AttributeReference[id=7,name=,alias=$aggregate1,relation=$aggregate,
+| |   |     type=Long]
+| |   +-AttributeReference[id=11,name=,alias=$aggregate4,relation=$aggregate,
+| |     type=Double NULL]
 | +-filter_predicate=Greater
 | | +-Add
 | | | +-AttributeReference[id=11,name=,alias=$aggregate4,relation=$aggregate,
@@ -1311,31 +1328,40 @@
 | |   +-Alias[id=8,name=,alias=$aggregate0,relation=$aggregate,type=Long]
 | |   | +-AggregateFunction[function=COUNT]
 | |   |   +-Add
-| |   |     +-Add
-| |   |     | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
-| |   |     | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
-| |   |     +-Add
-| |   |       +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
-| |   |       +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |   |     +-CommonSubexpression[common_subexpression_id=13]
+| |   |     | +-Operand=Add
+| |   |     |   +-AttributeReference[id=0,name=int_col,relation=test,
+| |   |     |   | type=Int NULL]
+| |   |     |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |   |     +-CommonSubexpression[common_subexpression_id=14]
+| |   |       +-Operand=Add
+| |   |         +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |   |         +-AttributeReference[id=2,name=float_col,relation=test,
+| |   |           type=Float]
 | |   +-Alias[id=9,name=,alias=$aggregate1,relation=$aggregate,type=Long NULL]
 | |   | +-AggregateFunction[function=MAX]
 | |   |   +-Divide
-| |   |     +-Add
-| |   |     | +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
-| |   |     | +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |   |     +-CommonSubexpression[common_subexpression_id=13]
+| |   |     | +-Operand=Add
+| |   |     |   +-AttributeReference[id=0,name=int_col,relation=test,
+| |   |     |   | type=Int NULL]
+| |   |     |   +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
 | |   |     +-Literal[value=2,type=Int]
 | |   +-Alias[id=11,name=,alias=$aggregate2,relation=$aggregate,type=Long NULL]
 | |   | +-AggregateFunction[function=MAX]
 | |   |   +-Add
 | |   |     +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
-| |   |     +-Add
-| |   |       +-AttributeReference[id=0,name=int_col,relation=test,type=Int NULL]
-| |   |       +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |   |     +-CommonSubexpression[common_subexpression_id=13]
+| |   |       +-Operand=Add
+| |   |         +-AttributeReference[id=0,name=int_col,relation=test,
+| |   |         | type=Int NULL]
+| |   |         +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
 | |   +-Alias[id=12,name=,alias=$aggregate3,relation=$aggregate,type=Double NULL]
 | |     +-AggregateFunction[function=SUM]
-| |       +-Add
-| |         +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
-| |         +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
+| |       +-CommonSubexpression[common_subexpression_id=14]
+| |         +-Operand=Add
+| |           +-AttributeReference[id=1,name=long_col,relation=test,type=Long]
+| |           +-AttributeReference[id=2,name=float_col,relation=test,type=Float]
 | +-filter_predicate=Greater
 | | +-AttributeReference[id=11,name=,alias=$aggregate2,relation=$aggregate,
 | | | type=Long NULL]
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 39538ea..a6a3cd0 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -282,6 +282,7 @@
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_ColumnVectorCache
                       quickstep_utility_Macros
                       quickstep_utility_lipfilter_LIPFilterAdaptiveProber
                       quickstep_utility_lipfilter_LIPFilterUtil
@@ -331,6 +332,7 @@
                       quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
                       quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_ColumnVectorCache
                       quickstep_utility_Macros
                       tmb)
 target_link_libraries(quickstep_relationaloperators_RebuildWorkOrder
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index 0e75411..ea90828 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -50,6 +50,7 @@
 #include "types/TypedValue.hpp"
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/ColumnVectorCache.hpp"
 #include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
 #include "utility/lip_filter/LIPFilterUtil.hpp"
 
@@ -532,6 +533,7 @@
     }
 
     ColumnVectorsValueAccessor temp_result;
+    std::unique_ptr<ColumnVectorCache> cv_cache = std::make_unique<ColumnVectorCache>();
     for (auto selection_cit = selection_.begin();
          selection_cit != selection_.end();
          ++selection_cit) {
@@ -539,8 +541,10 @@
                                                                   build_accessor.get(),
                                                                   probe_relation_id,
                                                                   probe_accessor,
-                                                                  build_block_entry.second));
+                                                                  build_block_entry.second,
+                                                                  cv_cache.get()));
     }
+    cv_cache.reset();
 
     output_destination_->bulkInsertTuples(&temp_result);
   }
@@ -649,12 +653,14 @@
         zipped_joined_tuple_ids.emplace_back(build_tids[i], probe_tids[i]);
       }
 
+      ColumnVectorCache cv_cache;
       for (const Scalar *scalar : non_trivial_expressions) {
         temp_result.addColumn(scalar->getAllValuesForJoin(build_relation_id,
                                                           build_accessor.get(),
                                                           probe_relation_id,
                                                           probe_accessor,
-                                                          zipped_joined_tuple_ids));
+                                                          zipped_joined_tuple_ids,
+                                                          &cv_cache));
       }
     }
 
@@ -765,13 +771,16 @@
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
       probe_store.createValueAccessor(&filter));
+
   ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ColumnVectorCache> cv_cache = std::make_unique<ColumnVectorCache>();
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end();
        ++selection_it) {
     temp_result.addColumn((*selection_it)->getAllValues(
-        probe_accessor_with_filter.get(), &sub_blocks_ref));
+        probe_accessor_with_filter.get(), &sub_blocks_ref, cv_cache.get()));
   }
+  cv_cache.reset();
 
   output_destination_->bulkInsertTuples(&temp_result);
 }
@@ -828,12 +837,15 @@
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
       probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+
   ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ColumnVectorCache> cv_cache = std::make_unique<ColumnVectorCache>();
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end(); ++selection_it) {
     temp_result.addColumn((*selection_it)->getAllValues(
-        probe_accessor_with_filter.get(), &sub_blocks_ref));
+        probe_accessor_with_filter.get(), &sub_blocks_ref, cv_cache.get()));
   }
+  cv_cache.reset();
 
   output_destination_->bulkInsertTuples(&temp_result);
 }
@@ -886,12 +898,15 @@
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
       probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+
   ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ColumnVectorCache> cv_cache = std::make_unique<ColumnVectorCache>();
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end(); ++selection_it) {
     temp_result.addColumn((*selection_it)->getAllValues(
-        probe_accessor_with_filter.get(), &sub_blocks_ref));
+        probe_accessor_with_filter.get(), &sub_blocks_ref, cv_cache.get()));
   }
+  cv_cache.reset();
 
   output_destination_->bulkInsertTuples(&temp_result);
 }
@@ -976,14 +991,18 @@
 
   std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
       probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
+
   ColumnVectorsValueAccessor temp_result;
+  std::unique_ptr<ColumnVectorCache> cv_cache = std::make_unique<ColumnVectorCache>();
   for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
        selection_it != selection_.end();
        ++selection_it) {
     temp_result.addColumn(
         (*selection_it)->getAllValues(probe_accessor_with_filter.get(),
-                                      &sub_blocks_ref));
+                                      &sub_blocks_ref,
+                                      cv_cache.get()));
   }
+  cv_cache.reset();
 
   output_destination_->bulkInsertTuples(&temp_result);
 }
@@ -1032,12 +1051,11 @@
            &build_block_entry : *collector.getJoinedTupleMap()) {
     const BlockReference build_block =
         storage_manager_->getBlock(build_block_entry.first, build_relation_);
-    const TupleStorageSubBlock &build_store =
-        build_block->getTupleStorageSubBlock();
+    const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
+    std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
 
-    std::unique_ptr<ValueAccessor> build_accessor(
-        build_store.createValueAccessor());
     ColumnVectorsValueAccessor temp_result;
+    std::unique_ptr<ColumnVectorCache> cv_cache = std::make_unique<ColumnVectorCache>();
     for (auto selection_it = selection_.begin();
          selection_it != selection_.end();
          ++selection_it) {
@@ -1047,8 +1065,11 @@
               build_accessor.get(),
               probe_relation_id,
               probe_accessor.get(),
-              build_block_entry.second));
+              build_block_entry.second,
+              cv_cache.get()));
     }
+    cv_cache.reset();
+
     output_destination_->bulkInsertTuples(&temp_result);
   }
 
@@ -1061,8 +1082,9 @@
   if (num_tuples_without_matches > 0) {
     std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
         probe_accessor->createSharedTupleIdSequenceAdapterVirtual(*existence_map));
-    ColumnVectorsValueAccessor temp_result;
 
+    ColumnVectorsValueAccessor temp_result;
+    std::unique_ptr<ColumnVectorCache> cv_cache = std::make_unique<ColumnVectorCache>();
     for (std::size_t i = 0; i < selection_.size(); ++i) {
       if (is_selection_on_build_[i]) {
         // NOTE(harshad, jianqiao): The assumption here is that any operation
@@ -1090,9 +1112,12 @@
       } else {
         temp_result.addColumn(
             selection_[i]->getAllValues(probe_accessor_with_filter.get(),
-                                        &sub_blocks_ref));
+                                        &sub_blocks_ref,
+                                        cv_cache.get()));
       }
     }
+    cv_cache.reset();
+
     output_destination_->bulkInsertTuples(&temp_result);
   }
 }
diff --git a/relational_operators/NestedLoopsJoinOperator.cpp b/relational_operators/NestedLoopsJoinOperator.cpp
index f17402f..4ef2a70 100644
--- a/relational_operators/NestedLoopsJoinOperator.cpp
+++ b/relational_operators/NestedLoopsJoinOperator.cpp
@@ -38,6 +38,7 @@
 #include "storage/TupleStorageSubBlock.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
+#include "utility/ColumnVectorCache.hpp"
 
 #include "glog/logging.h"
 
@@ -417,6 +418,7 @@
     // evaluation and data movement, but low enough that temporary memory
     // requirements don't get out of hand).
     ColumnVectorsValueAccessor temp_result;
+    std::unique_ptr<ColumnVectorCache> cv_cache = std::make_unique<ColumnVectorCache>();
     for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
          selection_cit != selection_.end();
          ++selection_cit) {
@@ -424,8 +426,10 @@
                                                                   left_accessor.get(),
                                                                   right_input_relation_id,
                                                                   right_accessor.get(),
-                                                                  joined_tuple_ids));
+                                                                  joined_tuple_ids,
+                                                                  cv_cache.get()));
     }
+    cv_cache.reset();
 
     output_destination_->bulkInsertTuples(&temp_result);
   }
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 90543c4..e5dc93e 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -57,6 +57,7 @@
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/containers/Tuple.hpp"
+#include "utility/ColumnVectorCache.hpp"
 #include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
 
 #include "gflags/gflags.h"
@@ -491,9 +492,10 @@
     SubBlocksReference sub_blocks_ref(tuple_store,
                                       block->getIndices(),
                                       block->getIndicesConsistent());
+    ColumnVectorCache cv_cache;
     for (const auto &expression : non_trivial_expressions_) {
       non_trivial_results->addColumn(
-          expression->getAllValues(accessor, &sub_blocks_ref));
+          expression->getAllValues(accessor, &sub_blocks_ref, &cv_cache));
     }
   }
 
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index cb1f098..c3db584 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -297,6 +297,7 @@
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_types_containers_Tuple
+                      quickstep_utility_ColumnVectorCache
                       quickstep_utility_Macros
                       quickstep_utility_lipfilter_LIPFilterAdaptiveProber)
 target_link_libraries(quickstep_storage_AggregationOperationState_proto
@@ -961,6 +962,7 @@
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_types_containers_Tuple
                       quickstep_types_operations_comparisons_ComparisonUtil
+                      quickstep_utility_ColumnVectorCache
                       quickstep_utility_Macros
                       quickstep_utility_PtrVector)
 # CMAKE_VALIDATE_IGNORE_BEGIN
@@ -1100,6 +1102,7 @@
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_ColumnVectorUtil
                       quickstep_types_containers_ColumnVectorsValueAccessor
+                      quickstep_utility_ColumnVectorCache
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_WindowAggregationOperationState_proto
                       quickstep_expressions_Expressions_proto
diff --git a/storage/StorageBlock.cpp b/storage/StorageBlock.cpp
index e91c1ac..31f1db2 100644
--- a/storage/StorageBlock.cpp
+++ b/storage/StorageBlock.cpp
@@ -56,6 +56,7 @@
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/containers/Tuple.hpp"
 #include "types/operations/comparisons/ComparisonUtil.hpp"
+#include "utility/ColumnVectorCache.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -369,15 +370,18 @@
                                       indices_,
                                       indices_consistent_);
 
-    std::unique_ptr<ValueAccessor> accessor(
-        tuple_store_->createValueAccessor(filter));
+    std::unique_ptr<ValueAccessor> accessor(tuple_store_->createValueAccessor(filter));
+    ColumnVectorCache cv_cache;
 
     for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection.begin();
          selection_cit != selection.end();
          ++selection_cit) {
       // TODO(chasseur): Can probably elide some copies for parts of the
       // selection that are ScalarAttribute or ScalarLiteral.
-      temp_result.addColumn((*selection_cit)->getAllValues(accessor.get(), &sub_blocks_ref));
+      temp_result.addColumn(
+          (*selection_cit)->getAllValues(accessor.get(),
+                                         &sub_blocks_ref,
+                                         &cv_cache));
     }
   }
 
diff --git a/storage/WindowAggregationOperationState.cpp b/storage/WindowAggregationOperationState.cpp
index 58bdf18..30d91db 100644
--- a/storage/WindowAggregationOperationState.cpp
+++ b/storage/WindowAggregationOperationState.cpp
@@ -46,6 +46,7 @@
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "types/containers/ColumnVectorUtil.hpp"
+#include "utility/ColumnVectorCache.hpp"
 
 #include "glog/logging.h"
 
@@ -236,11 +237,16 @@
       argument_accessor = new ColumnVectorsValueAccessor();
     }
 
+    std::unique_ptr<ColumnVectorCache> cv_cache = std::make_unique<ColumnVectorCache>();
     for (const std::unique_ptr<const Scalar> &argument : arguments_) {
       argument_accessor->addColumn(argument->getAllValues(tuple_accessor,
-                                                          &sub_block_ref));
+                                                          &sub_block_ref,
+                                                          cv_cache.get()));
     }
 
+    // Release common subexpression cache as early as possible.
+    cv_cache.reset();
+
     InvokeOnAnyValueAccessor(tuple_accessor,
                              [&] (auto *tuple_accessor) -> void {  // NOLINT(build/c++11)
       tuple_accessor->beginIteration();
diff --git a/types/containers/CMakeLists.txt b/types/containers/CMakeLists.txt
index c2a6623..97841c2 100644
--- a/types/containers/CMakeLists.txt
+++ b/types/containers/CMakeLists.txt
@@ -42,8 +42,7 @@
                       quickstep_types_TypedValue
                       quickstep_types_containers_ColumnVector
                       quickstep_types_containers_Tuple
-                      quickstep_utility_Macros
-                      quickstep_utility_ScopedDeleter)
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_types_containers_Tuple
                       glog
                       quickstep_catalog_CatalogTypedefs
diff --git a/types/containers/ColumnVector.hpp b/types/containers/ColumnVector.hpp
index fc65656..5ef9871 100644
--- a/types/containers/ColumnVector.hpp
+++ b/types/containers/ColumnVector.hpp
@@ -43,6 +43,9 @@
 // TODO(chasseur): Look into ways to allocate ColumnVector memory from the
 // StorageManager.
 
+class ColumnVector;
+typedef std::shared_ptr<const ColumnVector> ColumnVectorPtr;
+
 /**
  * @brief A vector of values of the same type. Two implementations exist:
  *        NativeColumnVector (an array of fixed-size data elements) and
@@ -107,6 +110,13 @@
    **/
   virtual bool isNative() const = 0;
 
+  /**
+   * @brief Get the number of values in this ColumnVector.
+   *
+   * @return The number of values in this ColumnVector.
+   **/
+  virtual std::size_t size() const = 0;
+
  protected:
   const Type &type_;
 
@@ -176,7 +186,7 @@
    *
    * @return The number of values in this NativeColumnVector.
    **/
-  inline std::size_t size() const {
+  inline std::size_t size() const override {
     return actual_length_;
   }
 
@@ -436,7 +446,7 @@
    *
    * @return The number of values in this IndirectColumnVector.
    **/
-  inline std::size_t size() const {
+  inline std::size_t size() const override {
     return values_.size();
   }
 
diff --git a/types/containers/ColumnVectorsValueAccessor.hpp b/types/containers/ColumnVectorsValueAccessor.hpp
index 6dc1124..ebd46d4 100644
--- a/types/containers/ColumnVectorsValueAccessor.hpp
+++ b/types/containers/ColumnVectorsValueAccessor.hpp
@@ -32,7 +32,6 @@
 #include "types/containers/ColumnVector.hpp"
 #include "types/containers/Tuple.hpp"
 #include "utility/Macros.hpp"
-#include "utility/ScopedDeleter.hpp"
 
 #include "glog/logging.h"
 
@@ -74,22 +73,17 @@
    *             this value-accessor is responsible for freeing this column
    *             vector.
    **/
-  void addColumn(ColumnVector *column, const bool owns = true) {
+  void addColumn(ColumnVectorPtr column) {
     // If this is not the first column to be added, make sure it is the same
     // length as the others.
-    DCHECK(columns_.empty()
-           || (column->isNative()
-               ? (static_cast<const NativeColumnVector*>(column)->size() == column_length_)
-               : (static_cast<const IndirectColumnVector*>(column)->size() == column_length_)));
+    DCHECK(columns_.empty() || column->size() == column_length_);
     columns_.push_back(column);
     column_native_.push_back(column->isNative());
-    if (owns) {
-      deleter_.addObject(column);
-    }
-    column_length_
-        = column->isNative()
-          ? static_cast<const NativeColumnVector*>(column)->size()
-          : static_cast<const IndirectColumnVector*>(column)->size();
+    column_length_ = column->size();
+  }
+
+  void addColumn(ColumnVector *column) {
+    addColumn(ColumnVectorPtr(column));
   }
 
   inline void beginIteration() {
@@ -309,11 +303,10 @@
            && (static_cast<std::vector<ColumnVector*>::size_type>(attr_id) < columns_.size());
   }
 
-  std::vector<ColumnVector*> columns_;
+  std::vector<ColumnVectorPtr> columns_;
   std::vector<bool> column_native_;
   std::size_t column_length_;
   std::size_t current_position_;
-  ScopedDeleter deleter_;
 
   DISALLOW_COPY_AND_ASSIGN(ColumnVectorsValueAccessor);
 };
diff --git a/types/operations/binary_operations/AddBinaryOperation.hpp b/types/operations/binary_operations/AddBinaryOperation.hpp
index bc862bf..2309563 100644
--- a/types/operations/binary_operations/AddBinaryOperation.hpp
+++ b/types/operations/binary_operations/AddBinaryOperation.hpp
@@ -51,6 +51,10 @@
     return instance;
   }
 
+  bool isCommutative() const override {
+    return true;
+  }
+
   bool canApplyToTypes(const Type &left,
                        const Type &right) const override;
 
diff --git a/types/operations/binary_operations/BinaryOperation.hpp b/types/operations/binary_operations/BinaryOperation.hpp
index 585a1c6..bc8a083 100644
--- a/types/operations/binary_operations/BinaryOperation.hpp
+++ b/types/operations/binary_operations/BinaryOperation.hpp
@@ -335,6 +335,19 @@
   }
 
   /**
+   * @brief Whether this binary operation is commutative.
+   *
+   * @note The commutative property provides more optimization opportunities,
+   *       e.g. common subexpression elimination. Meanwhile it is always safe
+   *       to assume that a binary operation is not commutative.
+   *
+   * @return True if this binary operation is commutative; false otherwise.
+   */
+  virtual bool isCommutative() const {
+    return false;
+  }
+
+  /**
    * @brief Determine whether this BinaryOperation can apply to the specified
    *        Types.
    * @note When the Types that an operator can apply to are changed,
diff --git a/types/operations/binary_operations/MultiplyBinaryOperation.hpp b/types/operations/binary_operations/MultiplyBinaryOperation.hpp
index 6edc999..cc005e2 100644
--- a/types/operations/binary_operations/MultiplyBinaryOperation.hpp
+++ b/types/operations/binary_operations/MultiplyBinaryOperation.hpp
@@ -51,6 +51,10 @@
     return instance;
   }
 
+  bool isCommutative() const override {
+    return true;
+  }
+
   bool canApplyToTypes(const Type &left,
                        const Type &right) const override;
 
diff --git a/utility/CMakeLists.txt b/utility/CMakeLists.txt
index ca04462..e1fb770 100644
--- a/utility/CMakeLists.txt
+++ b/utility/CMakeLists.txt
@@ -171,6 +171,7 @@
 add_library(quickstep_utility_CalculateInstalledMemory CalculateInstalledMemory.cpp CalculateInstalledMemory.hpp)
 add_library(quickstep_utility_Cast ../empty_src.cpp Cast.hpp)
 add_library(quickstep_utility_CheckSnprintf ../empty_src.cpp CheckSnprintf.hpp)
+add_library(quickstep_utility_ColumnVectorCache ../empty_src.cpp ColumnVectorCache.hpp)
 add_library(quickstep_utility_CompositeHash ../empty_src.cpp CompositeHash.hpp)
 add_library(quickstep_utility_BarrieredReadWriteConcurrentBitVector
             ../empty_src.cpp
@@ -182,6 +183,7 @@
             ExecutionDAGVisualizer.cpp
             ExecutionDAGVisualizer.hpp)
 add_library(quickstep_utility_Glob Glob.cpp Glob.hpp)
+add_library(quickstep_utility_HashError ../empty_src.cpp HashError.hpp)
 add_library(quickstep_utility_HashPair ../empty_src.cpp HashPair.hpp)
 add_library(quickstep_utility_Macros ../empty_src.cpp Macros.hpp)
 add_library(quickstep_utility_MemStream ../empty_src.cpp MemStream.hpp)
@@ -237,6 +239,9 @@
                       glog)
 target_link_libraries(quickstep_utility_CheckSnprintf
                       glog)
+target_link_libraries(quickstep_utility_ColumnVectorCache
+                      quickstep_types_containers_ColumnVector
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_utility_CompositeHash
                       quickstep_types_TypedValue
                       quickstep_utility_HashPair
@@ -344,12 +349,14 @@
                       quickstep_utility_CalculateInstalledMemory
                       quickstep_utility_Cast
                       quickstep_utility_CheckSnprintf
+                      quickstep_utility_ColumnVectorCache
                       quickstep_utility_CompositeHash
                       quickstep_utility_DAG
                       quickstep_utility_DisjointTreeForest
                       quickstep_utility_EqualsAnyConstant
                       quickstep_utility_ExecutionDAGVisualizer
                       quickstep_utility_Glob
+                      quickstep_utility_HashError
                       quickstep_utility_HashPair
                       quickstep_utility_Macros
                       quickstep_utility_MemStream
diff --git a/utility/ColumnVectorCache.hpp b/utility/ColumnVectorCache.hpp
new file mode 100644
index 0000000..46e742c
--- /dev/null
+++ b/utility/ColumnVectorCache.hpp
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_COLUMN_VECTOR_CACHE_HPP_
+#define QUICKSTEP_UTILITY_COLUMN_VECTOR_CACHE_HPP_
+
+#include <unordered_map>
+
+#include "types/containers/ColumnVector.hpp"
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Expressions
+ *  @{
+ */
+
+/**
+ * @brief A memoization table for column vectors parameterized on integer IDs.
+ **/
+class ColumnVectorCache {
+ public:
+  /**
+   * @brief Constructor.
+   */
+  ColumnVectorCache() {}
+
+  /**
+   * @brief Check whether this cache contains the column vector parameterized on
+   *        the given ID.
+   *
+   * @param share_id The integer ID for the column vector.
+   * @return True if this cache contains the column vector; false otherwise.
+   */
+  inline bool contains(const int share_id) const {
+    return cv_cache_.find(share_id) != cv_cache_.end();
+  }
+
+  /**
+   * @brief Get the cached column vector parameterized on the given ID.
+   *
+   * @param share_id The integer ID for the column vector.
+   * @return The cached column vector.
+   */
+  inline ColumnVectorPtr get(const int share_id) const {
+    DCHECK(contains(share_id));
+    return cv_cache_.at(share_id);
+  }
+
+  /**
+   * @brief Cache the column vector with the given ID as parameter.
+   *
+   * @param share_id The integer ID for the column vector.
+   * @param cv The column vector to be cached.
+   */
+  inline void set(const int share_id, const ColumnVectorPtr &cv) {
+    DCHECK(!contains(share_id));
+    cv_cache_.emplace(share_id, cv);
+  }
+
+ private:
+  std::unordered_map<int, ColumnVectorPtr> cv_cache_;
+
+  DISALLOW_COPY_AND_ASSIGN(ColumnVectorCache);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_COLUMN_VECTOR_CACHE_HPP_
diff --git a/utility/HashError.hpp b/utility/HashError.hpp
new file mode 100644
index 0000000..1f7b1b8
--- /dev/null
+++ b/utility/HashError.hpp
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_UTILITY_HASH_ERROR_HPP_
+#define QUICKSTEP_UTILITY_HASH_ERROR_HPP_
+
+#include <exception>
+#include <string>
+
+namespace quickstep {
+
+/** \addtogroup Utility
+ *  @{
+ */
+
+/**
+ * @brief Exception thrown for non-supported hash().
+ **/
+class HashNotSupported : public std::exception {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param message The error message.
+   **/
+  explicit HashNotSupported(const std::string &message)
+      : message_(message) {}
+
+  /**
+   * @brief Destructor.
+   */
+  ~HashNotSupported() throw() {}
+
+  virtual const char* what() const throw() {
+    return message_.c_str();
+  }
+
+ private:
+  const std::string message_;
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_UTILITY_HASH_ERROR_HPP_