Merge branch 'master' into in-operator-support
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index eea94cd..ac41912 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -168,11 +168,13 @@
                       quickstep_storage_StorageBlock
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageManager
+                      quickstep_storage_SubBlocksReference
                       quickstep_storage_TupleReference
                       quickstep_storage_TupleStorageSubBlock
                       quickstep_storage_ValueAccessor
                       quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_utility_Macros
+                      quickstep_utility_PtrList
                       tmb)
 target_link_libraries(quickstep_relationaloperators_InsertOperator
                       glog
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index d947340..404006f 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -35,6 +35,7 @@
 #include "storage/StorageBlock.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
+#include "storage/SubBlocksReference.hpp"
 #include "storage/TupleReference.hpp"
 #include "storage/TupleStorageSubBlock.hpp"
 #include "storage/ValueAccessor.hpp"
@@ -85,7 +86,7 @@
   // Consolidation is a no-op for this version, but we provide this trivial
   // call so that MapBasedJoinedTupleCollector and
   // VectorBasedJoinedTupleCollector have the same interface and can both be
-  // used in the templated HashJoinWorkOrder::executeWithCollectorType() method.
+  // used in the templated HashInnerJoinWorkOrder::executeWithCollectorType() method.
   inline void consolidate() const {
   }
 
@@ -183,6 +184,57 @@
       consolidated_joined_tuples_;
 };
 
+class SemiAntiJoinTupleCollector {
+ public:
+  SemiAntiJoinTupleCollector(const TupleStorageSubBlock &tuple_store) {
+    filter_.reset(tuple_store.getExistenceMap());
+  }
+
+  template <typename ValueAccessorT>
+  inline void operator()(const ValueAccessorT &accessor) {
+    filter_->set(accessor.getCurrentPosition(), false);
+  }
+
+  const TupleIdSequence* filter() const {
+    return filter_.get();
+  }
+
+ private:
+  std::unique_ptr<TupleIdSequence> filter_;
+};
+
+class OuterJoinTupleCollector {
+ public:
+  OuterJoinTupleCollector(const TupleStorageSubBlock &tuple_store) {
+    filter_.reset(tuple_store.getExistenceMap());
+  }
+
+  template <typename ValueAccessorT>
+  inline void operator()(const ValueAccessorT &accessor,
+                         const TupleReference &tref) {
+    joined_tuples_[tref.block].emplace_back(tref.tuple, accessor.getCurrentPosition());
+  }
+
+  template <typename ValueAccessorT>
+  inline void recordMatch(const ValueAccessorT &accessor) {
+    filter_->set(accessor.getCurrentPosition(), false);
+  }
+
+  inline std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>>*
+      getJoinedTupleMap() {
+    return &joined_tuples_;
+  }
+
+  const TupleIdSequence* filter() const {
+    return filter_.get();
+  }
+
+ private:
+  std::unordered_map<block_id, std::vector<std::pair<tuple_id, tuple_id>>> joined_tuples_;
+  // BitVector on the probe relation. 1 if the corresponding tuple has no match.
+  std::unique_ptr<TupleIdSequence> filter_;
+};
+
 }  // namespace
 
 bool HashJoinOperator::getAllWorkOrders(
@@ -191,31 +243,57 @@
     StorageManager *storage_manager,
     const tmb::client_id foreman_client_id,
     tmb::MessageBus *bus) {
+  switch (join_type_) {
+    case JoinType::kInnerJoin:
+      return getAllNonOuterJoinWorkOrders<HashInnerJoinWorkOrder>(
+          container, query_context, storage_manager);
+    case JoinType::kLeftOuterJoin:
+      return getAllOuterJoinWorkOrders(container, query_context,
+                                       storage_manager);
+    case JoinType::kLeftSemiJoin:
+      return getAllNonOuterJoinWorkOrders<HashSemiJoinWorkOrder>(
+          container, query_context, storage_manager);
+    case JoinType::kLeftAntiJoin:
+      return getAllNonOuterJoinWorkOrders<HashAntiJoinWorkOrder>(
+          container, query_context, storage_manager);
+    default:
+      LOG(FATAL) << "Unknown join type in HashJoinOperator::getAllWorkOrders()";
+  }
+}
+
+template <class JoinWorkOrderClass>
+bool HashJoinOperator::getAllNonOuterJoinWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager) {
   // We wait until the building of global hash table is complete.
   if (blocking_dependencies_met_) {
     DCHECK(query_context != nullptr);
 
-    const Predicate *residual_predicate = query_context->getPredicate(residual_predicate_index_);
+    const Predicate *residual_predicate =
+        query_context->getPredicate(residual_predicate_index_);
     const vector<unique_ptr<const Scalar>> &selection =
-        query_context->getScalarGroup(selection_index_);
+        query_context->getScalarGroup(selection_on_probe_index_);
     InsertDestination *output_destination =
         query_context->getInsertDestination(output_destination_index_);
-    JoinHashTable *hash_table = query_context->getJoinHashTable(hash_table_index_);
+    const JoinHashTable &hash_table =
+        *(query_context->getJoinHashTable(hash_table_index_));
 
     if (probe_relation_is_stored_) {
       if (!started_) {
         for (const block_id probe_block_id : probe_relation_block_ids_) {
           container->addNormalWorkOrder(
-              new HashJoinWorkOrder(build_relation_,
-                                    probe_relation_,
-                                    join_key_attributes_,
-                                    any_join_key_attributes_nullable_,
-                                    probe_block_id,
-                                    residual_predicate,
-                                    selection,
-                                    output_destination,
-                                    hash_table,
-                                    storage_manager),
+              new JoinWorkOrderClass(
+                  build_relation_,
+                  probe_relation_,
+                  join_key_attributes_,
+                  any_join_key_attributes_nullable_,
+                  probe_block_id,
+                  selection,
+                  hash_table,
+                  residual_predicate,
+                  output_destination,
+                  storage_manager),
               op_index_);
         }
         started_ = true;
@@ -224,27 +302,99 @@
     } else {
       while (num_workorders_generated_ < probe_relation_block_ids_.size()) {
         container->addNormalWorkOrder(
-            new HashJoinWorkOrder(
+            new JoinWorkOrderClass(
                 build_relation_,
                 probe_relation_,
                 join_key_attributes_,
                 any_join_key_attributes_nullable_,
                 probe_relation_block_ids_[num_workorders_generated_],
-                residual_predicate,
                 selection,
-                output_destination,
                 hash_table,
+                residual_predicate,
+                output_destination,
                 storage_manager),
             op_index_);
         ++num_workorders_generated_;
-      }  // end while
+      }
       return done_feeding_input_relation_;
-    }  // end else (input_relation_is_stored is false)
-  }  // end if (blocking_dependencies_met)
+    }  // end else (probe_relation_is_stored_)
+  }  // end if (blocking_dependencies_met_)
   return false;
 }
 
-void HashJoinWorkOrder::execute() {
+bool HashJoinOperator::getAllOuterJoinWorkOrders(
+    WorkOrdersContainer *container,
+    QueryContext *query_context,
+    StorageManager *storage_manager) {
+  // We wait until the building of global hash table is complete.
+  if (blocking_dependencies_met_) {
+    DCHECK(query_context != nullptr);
+
+    const vector<unique_ptr<const Scalar>> &selection_on_probe =
+        query_context->getScalarGroup(selection_on_probe_index_);
+    const vector<unique_ptr<const Scalar>> &selection_on_build =
+        query_context->getScalarGroup(selection_on_build_index_);
+    InsertDestination *output_destination =
+        query_context->getInsertDestination(output_destination_index_);
+    const JoinHashTable &hash_table =
+        *(query_context->getJoinHashTable(hash_table_index_));
+
+    // TODO(harshad, jianqiao) Construct the vector below in ExecutionGenerator
+    // and pass it as an argument to the HashJoinOperator.
+    std::vector<const Type*> selection_on_build_types;
+    for (auto selection_on_build_it = selection_on_build.begin();
+         selection_on_build_it != selection_on_build.end();
+         ++selection_on_build_it) {
+      selection_on_build_types.emplace_back(
+          (&(*selection_on_build_it)->getType().getNullableVersion()));
+    }
+
+    if (probe_relation_is_stored_) {
+      if (!started_) {
+        for (const block_id probe_block_id : probe_relation_block_ids_) {
+          container->addNormalWorkOrder(
+              new HashOuterJoinWorkOrder(
+                  build_relation_,
+                  probe_relation_,
+                  join_key_attributes_,
+                  any_join_key_attributes_nullable_,
+                  hash_table,
+                  selection_on_probe,
+                  selection_on_build,
+                  selection_on_build_types,
+                  probe_block_id,
+                  output_destination,
+                  storage_manager),
+              op_index_);
+        }
+        started_ = true;
+      }
+      return started_;
+    } else {
+      while (num_workorders_generated_ < probe_relation_block_ids_.size()) {
+        container->addNormalWorkOrder(
+            new HashOuterJoinWorkOrder(
+                build_relation_,
+                probe_relation_,
+                join_key_attributes_,
+                any_join_key_attributes_nullable_,
+                hash_table,
+                selection_on_probe,
+                selection_on_build,
+                selection_on_build_types,
+                probe_relation_block_ids_[num_workorders_generated_],
+                output_destination,
+                storage_manager),
+            op_index_);
+        ++num_workorders_generated_;
+      }
+      return done_feeding_input_relation_;
+    }  // end else (probe_relation_is_stored_)
+  }  // end if (blocking_dependencies_met_)
+  return false;
+}
+
+void HashInnerJoinWorkOrder::execute() {
   if (FLAGS_vector_based_joined_tuple_collector) {
     executeWithCollectorType<VectorBasedJoinedTupleCollector>();
   } else {
@@ -253,7 +403,7 @@
 }
 
 template <typename CollectorT>
-void HashJoinWorkOrder::executeWithCollectorType() {
+void HashInnerJoinWorkOrder::executeWithCollectorType() {
   BlockReference probe_block(
       storage_manager_->getBlock(block_id_, probe_relation_));
   const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
@@ -261,13 +411,13 @@
   std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
   CollectorT collector;
   if (join_key_attributes_.size() == 1) {
-    hash_table_->getAllFromValueAccessor(
+    hash_table_.getAllFromValueAccessor(
         probe_accessor.get(),
         join_key_attributes_.front(),
         any_join_key_attributes_nullable_,
         &collector);
   } else {
-    hash_table_->getAllFromValueAccessorCompositeKey(
+    hash_table_.getAllFromValueAccessorCompositeKey(
         probe_accessor.get(),
         join_key_attributes_,
         any_join_key_attributes_nullable_,
@@ -348,4 +498,359 @@
   }
 }
 
+void HashSemiJoinWorkOrder::execute() {
+  if (residual_predicate_ == nullptr) {
+    executeWithoutResidualPredicate();
+  } else {
+    executeWithResidualPredicate();
+  }
+}
+
+void HashSemiJoinWorkOrder::executeWithResidualPredicate() {
+  const relation_id build_relation_id = build_relation_.getID();
+  const relation_id probe_relation_id = probe_relation_.getID();
+
+  BlockReference probe_block = storage_manager_->getBlock(block_id_,
+                                                          probe_relation_);
+  const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+
+  // TODO(harshad) - Make this function work with both types of collectors.
+
+  // We collect all the matching probe relation tuples, as there's a residual
+  // preidcate that needs to be applied after collecting these matches.
+  MapBasedJoinedTupleCollector collector;
+  if (join_key_attributes_.size() == 1) {
+    hash_table_.getAllFromValueAccessor(
+        probe_accessor.get(),
+        join_key_attributes_.front(),
+        any_join_key_attributes_nullable_,
+        &collector);
+  } else {
+    hash_table_.getAllFromValueAccessorCompositeKey(
+        probe_accessor.get(),
+        join_key_attributes_,
+        any_join_key_attributes_nullable_,
+        &collector);
+  }
+
+  // Get a filter for tuples in the given probe block.
+  TupleIdSequence filter(probe_store.getMaxTupleID() + 1);
+  filter.setRange(0, filter.length(), false);
+  for (const std::pair<const block_id,
+                       std::vector<std::pair<tuple_id, tuple_id>>>
+           &build_block_entry : *collector.getJoinedTuples()) {
+    // First element of the pair build_block_entry is the build block ID
+    // 2nd element of the pair is a vector of pairs, in each of which -
+    // 1st element is a matching tuple ID from the inner (build) relation.
+    // 2nd element is a matching tuple ID from the outer (probe) relation.
+
+    // Get the block from the build relation for this pair of matched tuples.
+    BlockReference build_block =
+        storage_manager_->getBlock(build_block_entry.first, build_relation_);
+    const TupleStorageSubBlock &build_store =
+        build_block->getTupleStorageSubBlock();
+    std::unique_ptr<ValueAccessor> build_accessor(
+        build_store.createValueAccessor());
+    for (const std::pair<tuple_id, tuple_id> &hash_match
+         : build_block_entry.second) {
+      // For each pair, 1st element is a tuple ID from the build relation in the
+      // given build block, 2nd element is a tuple ID from the probe relation.
+      if (filter.get(hash_match.second)) {
+        // We have already found matches for this tuple that belongs to the
+        // probe side, skip it.
+        continue;
+      }
+      if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
+                                                      build_relation_id,
+                                                      hash_match.first,
+                                                      *probe_accessor,
+                                                      probe_relation_id,
+                                                      hash_match.second)) {
+        filter.set(hash_match.second, true);
+      }
+    }
+  }
+
+  SubBlocksReference sub_blocks_ref(probe_store,
+                                    probe_block->getIndices(),
+                                    probe_block->getIndicesConsistent());
+
+  std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+      probe_store.createValueAccessor(&filter));
+  ColumnVectorsValueAccessor temp_result;
+  for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+       selection_it != selection_.end();
+       ++selection_it) {
+    temp_result.addColumn((*selection_it)->getAllValues(
+        probe_accessor_with_filter.get(), &sub_blocks_ref));
+  }
+
+  output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashSemiJoinWorkOrder::executeWithoutResidualPredicate() {
+  DCHECK(residual_predicate_ == nullptr);
+
+  BlockReference probe_block = storage_manager_->getBlock(block_id_,
+                                                          probe_relation_);
+  const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+  SemiAntiJoinTupleCollector collector(probe_store);
+  // We collect all the probe relation tuples which have at least one matching
+  // tuple in the build relation. As a performance optimization, the hash table
+  // just looks for the existence of the probing key in the hash table and sets
+  // the bit for the probing key in the collector. The optimization works
+  // because there is no residual predicate in this case, unlike
+  // executeWithResidualPredicate().
+  if (join_key_attributes_.size() == 1u) {
+    // Call the collector to set the bit to 0 for every key without a match.
+    hash_table_.runOverKeysFromValueAccessorIfMatchNotFound(
+        probe_accessor.get(),
+        join_key_attributes_.front(),
+        any_join_key_attributes_nullable_,
+        &collector);
+  } else {
+    // Call the collector to set the bit to 0 for every key without a match.
+    hash_table_.runOverKeysFromValueAccessorIfMatchNotFoundCompositeKey(
+        probe_accessor.get(),
+        join_key_attributes_,
+        any_join_key_attributes_nullable_,
+        &collector);
+  }
+
+  SubBlocksReference sub_blocks_ref(probe_store,
+                                    probe_block->getIndices(),
+                                    probe_block->getIndicesConsistent());
+
+  std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+      probe_store.createValueAccessor(collector.filter()));
+  ColumnVectorsValueAccessor temp_result;
+  for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+       selection_it != selection_.end(); ++selection_it) {
+    temp_result.addColumn((*selection_it)->getAllValues(
+        probe_accessor_with_filter.get(), &sub_blocks_ref));
+  }
+
+  output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashAntiJoinWorkOrder::executeWithoutResidualPredicate() {
+  DEBUG_ASSERT(residual_predicate_ == nullptr);
+
+  BlockReference probe_block = storage_manager_->getBlock(block_id_,
+                                                          probe_relation_);
+  const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+  SemiAntiJoinTupleCollector collector(probe_store);
+  // We probe the hash table to find the keys which have an entry in the
+  // hash table.
+  if (join_key_attributes_.size() == 1) {
+    // Call the collector to set the bit to 0 for every key with a match.
+    hash_table_.runOverKeysFromValueAccessorIfMatchFound(
+        probe_accessor.get(),
+        join_key_attributes_.front(),
+        any_join_key_attributes_nullable_,
+        &collector);
+  } else {
+    // Call the collector to set the bit to 0 for every key with a match.
+    hash_table_.runOverKeysFromValueAccessorIfMatchFoundCompositeKey(
+        probe_accessor.get(),
+        join_key_attributes_,
+        any_join_key_attributes_nullable_,
+        &collector);
+  }
+
+  SubBlocksReference sub_blocks_ref(probe_store,
+                                    probe_block->getIndices(),
+                                    probe_block->getIndicesConsistent());
+
+  std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+      probe_store.createValueAccessor(collector.filter()));
+  ColumnVectorsValueAccessor temp_result;
+  for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+       selection_it != selection_.end(); ++selection_it) {
+    temp_result.addColumn((*selection_it)->getAllValues(
+        probe_accessor_with_filter.get(), &sub_blocks_ref));
+  }
+
+  output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashAntiJoinWorkOrder::executeWithResidualPredicate() {
+  const relation_id build_relation_id = build_relation_.getID();
+  const relation_id probe_relation_id = probe_relation_.getID();
+
+  BlockReference probe_block = storage_manager_->getBlock(block_id_,
+                                                          probe_relation_);
+  const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+  // TODO(harshad) - Make the following code work with both types of collectors.
+  MapBasedJoinedTupleCollector collector;
+  // We probe the hash table and get all the matches. Unlike
+  // executeWithoutResidualPredicate(), we have to collect all the matching
+  // tuples, because after this step we still have to evalute the residual
+  // predicate.
+  if (join_key_attributes_.size() == 1) {
+    hash_table_.getAllFromValueAccessor(
+        probe_accessor.get(),
+        join_key_attributes_.front(),
+        any_join_key_attributes_nullable_,
+        &collector);
+  } else {
+    hash_table_.getAllFromValueAccessorCompositeKey(
+        probe_accessor.get(),
+        join_key_attributes_,
+        any_join_key_attributes_nullable_,
+        &collector);
+  }
+
+  // Create a filter for all the tuples from the given probe block.
+  std::unique_ptr<TupleIdSequence> filter(probe_store.getExistenceMap());
+  for (const std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
+           &build_block_entry : *collector.getJoinedTuples()) {
+    // First element of the pair build_block_entry is the build block ID
+    // 2nd element of the pair is a vector of pairs, in each of which -
+    // 1st element is a matching tuple ID from the inner (build) relation.
+    // 2nd element is a matching tuple ID from the outer (probe) relation.
+    BlockReference build_block = storage_manager_->getBlock(build_block_entry.first,
+                                                            build_relation_);
+    const TupleStorageSubBlock &build_store = build_block->getTupleStorageSubBlock();
+    std::unique_ptr<ValueAccessor> build_accessor(build_store.createValueAccessor());
+    for (const std::pair<tuple_id, tuple_id> &hash_match
+         : build_block_entry.second) {
+      if (!filter->get(hash_match.second)) {
+        // We have already seen this tuple, skip it.
+        continue;
+      }
+      if (residual_predicate_->matchesForJoinedTuples(*build_accessor,
+                                                      build_relation_id,
+                                                      hash_match.first,
+                                                      *probe_accessor,
+                                                      probe_relation_id,
+                                                      hash_match.second)) {
+        // Note that the filter marks a match as false, as needed by the anti
+        // join definition.
+        filter->set(hash_match.second, false);
+      }
+    }
+  }
+
+  SubBlocksReference sub_blocks_ref(probe_store,
+                                    probe_block->getIndices(),
+                                    probe_block->getIndicesConsistent());
+
+  std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+      probe_store.createValueAccessor(filter.get()));
+  ColumnVectorsValueAccessor temp_result;
+  for (vector<unique_ptr<const Scalar>>::const_iterator selection_it = selection_.begin();
+       selection_it != selection_.end();
+       ++selection_it) {
+    temp_result.addColumn((*selection_it)->getAllValues(probe_accessor_with_filter.get(),
+                                                     &sub_blocks_ref));
+  }
+
+  output_destination_->bulkInsertTuples(&temp_result);
+}
+
+void HashOuterJoinWorkOrder::execute() {
+  const relation_id build_relation_id = build_relation_.getID();
+  const relation_id probe_relation_id = probe_relation_.getID();
+
+  BlockReference probe_block = storage_manager_->getBlock(block_id_,
+                                                          probe_relation_);
+  const TupleStorageSubBlock &probe_store = probe_block->getTupleStorageSubBlock();
+
+  std::unique_ptr<ValueAccessor> probe_accessor(probe_store.createValueAccessor());
+  OuterJoinTupleCollector collector(probe_store);
+  if (join_key_attributes_.size() == 1) {
+    hash_table_.getAllFromValueAccessorWithExtraWorkForFirstMatch(
+        probe_accessor.get(),
+        join_key_attributes_.front(),
+        any_join_key_attributes_nullable_,
+        &collector);
+  } else {
+    hash_table_.getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch(
+        probe_accessor.get(),
+        join_key_attributes_,
+        any_join_key_attributes_nullable_,
+        &collector);
+  }
+
+  for (const std::pair<const block_id, std::vector<std::pair<tuple_id, tuple_id>>>
+           &build_block_entry : *collector.getJoinedTupleMap()) {
+    BlockReference build_block =
+        storage_manager_->getBlock(build_block_entry.first, build_relation_);
+    const TupleStorageSubBlock &build_store =
+        build_block->getTupleStorageSubBlock();
+
+    std::unique_ptr<ValueAccessor> build_accessor(
+        build_store.createValueAccessor());
+    ColumnVectorsValueAccessor temp_result;
+    for (auto selection_it = selection_on_probe_.begin();
+         selection_it != selection_on_probe_.end();
+         ++selection_it) {
+      temp_result.addColumn(
+          (*selection_it)->getAllValuesForJoin(
+              build_relation_id,
+              build_accessor.get(),
+              probe_relation_id,
+              probe_accessor.get(),
+              build_block_entry.second));
+    }
+    for (auto selection_it = selection_on_build_.begin();
+         selection_it != selection_on_build_.end();
+         ++selection_it) {
+      temp_result.addColumn(
+          (*selection_it)->getAllValuesForJoin(
+              build_relation_id,
+              build_accessor.get(),
+              probe_relation_id,
+              probe_accessor.get(),
+              build_block_entry.second));
+    }
+
+    output_destination_->bulkInsertTuples(&temp_result);
+  }
+
+  SubBlocksReference sub_blocks_ref(probe_store,
+                                    probe_block->getIndices(),
+                                    probe_block->getIndicesConsistent());
+
+  // Populate the output tuples for non-matches.
+  const TupleIdSequence *filter = collector.filter();
+  const TupleIdSequence::size_type num_tuples_without_matches = filter->size();
+  if (num_tuples_without_matches > 0) {
+    std::unique_ptr<ValueAccessor> probe_accessor_with_filter(
+        probe_store.createValueAccessor(filter));
+    ColumnVectorsValueAccessor temp_result;
+    for (auto selection_it = selection_on_probe_.begin();
+         selection_it != selection_on_probe_.end();
+         ++selection_it) {
+      temp_result.addColumn(
+          (*selection_it)->getAllValues(probe_accessor_with_filter.get(),
+                                        &sub_blocks_ref));
+    }
+
+    for (const Type *selection_on_build_type : selection_on_build_types_) {
+      if (NativeColumnVector::UsableForType(*selection_on_build_type)) {
+        NativeColumnVector *result = new NativeColumnVector(
+            *selection_on_build_type, num_tuples_without_matches);
+        result->fillWithNulls();
+        temp_result.addColumn(result);
+      } else {
+        IndirectColumnVector *result = new IndirectColumnVector(
+            *selection_on_build_type, num_tuples_without_matches);
+        result->fillWithValue(TypedValue(selection_on_build_type->getTypeID()));
+        temp_result.addColumn(result);
+      }
+    }
+    output_destination_->bulkInsertTuples(&temp_result);
+  }
+}
+
 }  // namespace quickstep
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 9b8c87b..4141e14 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -30,6 +30,7 @@
 #include "storage/HashTable.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "utility/Macros.hpp"
+#include "utility/PtrList.hpp"
 
 #include "glog/logging.h"
 
@@ -51,11 +52,18 @@
  */
 
 /**
- * @brief An operator which performs a hash-join on two
- *        relations.
+ * @brief An operator which performs a hash-join, including inner-join,
+ *        semi-join, anti-join and outer-join on two relations.
  **/
 class HashJoinOperator : public RelationalOperator {
  public:
+  enum class JoinType {
+    kInnerJoin = 0,
+    kLeftOuterJoin,
+    kLeftSemiJoin,
+    kLeftAntiJoin
+  };
+
   /**
    * @brief Constructor.
    *
@@ -93,35 +101,48 @@
    *        additional filter to pairs of tuples that match the hash-join (i.e.
    *        key equality) predicate. Effectively, this makes the join predicate
    *        the conjunction of the key-equality predicate and residual predicate.
-   * @param selection_index The group index of Scalars in QueryContext,
-   *        corresponding to the attributes of the relation referred by
+   *        Note that this field is not relevant for anti-join.
+   * @param selection_on_probe_index The group index of Scalars in QueryContext,
+   *        corresponding to the attributes of the probe relation referred by
    *        output_relation_id. Each Scalar is evaluated for the joined tuples,
    *        and the resulting value is inserted into the join result.
+   * @param selection_on_build_index The group index of Scalars in QueryContext,
+   *        corresponding to the attributes of the build relation referred by
+   *        output_relation_id. Each Scalar is evaluated for the joined tuples,
+   *        and the resulting value is inserted into the join result.
+   * @param join_type The type of join corresponding to this operator.
    **/
-  HashJoinOperator(const CatalogRelation &build_relation,
-                   const CatalogRelation &probe_relation,
-                   const bool probe_relation_is_stored,
-                   const std::vector<attribute_id> &join_key_attributes,
-                   const bool any_join_key_attributes_nullable,
-                   const CatalogRelation &output_relation,
-                   const QueryContext::insert_destination_id output_destination_index,
-                   const QueryContext::join_hash_table_id hash_table_index,
-                   const QueryContext::predicate_id residual_predicate_index,
-                   const QueryContext::scalar_group_id selection_index)
-    : build_relation_(build_relation),
-      probe_relation_(probe_relation),
-      probe_relation_is_stored_(probe_relation_is_stored),
-      join_key_attributes_(join_key_attributes),
-      any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-      output_relation_(output_relation),
-      output_destination_index_(output_destination_index),
-      hash_table_index_(hash_table_index),
-      residual_predicate_index_(residual_predicate_index),
-      selection_index_(selection_index),
-      probe_relation_block_ids_(probe_relation_is_stored ? probe_relation.getBlocksSnapshot()
-                                                         : std::vector<block_id>()),
-      num_workorders_generated_(0),
-      started_(false) {}
+  HashJoinOperator(
+      const CatalogRelation &build_relation,
+      const CatalogRelation &probe_relation,
+      const bool probe_relation_is_stored,
+      const std::vector<attribute_id> &join_key_attributes,
+      const bool any_join_key_attributes_nullable,
+      const CatalogRelation &output_relation,
+      const QueryContext::insert_destination_id output_destination_index,
+      const QueryContext::join_hash_table_id hash_table_index,
+      const QueryContext::predicate_id residual_predicate_index,
+      const QueryContext::scalar_group_id selection_on_probe_index,
+      const QueryContext::scalar_group_id selection_on_build_index =
+          QueryContext::kInvalidScalarGroupId,
+      const JoinType join_type = JoinType::kInnerJoin)
+      : build_relation_(build_relation),
+        probe_relation_(probe_relation),
+        probe_relation_is_stored_(probe_relation_is_stored),
+        join_key_attributes_(join_key_attributes),
+        any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+        output_relation_(output_relation),
+        output_destination_index_(output_destination_index),
+        hash_table_index_(hash_table_index),
+        residual_predicate_index_(residual_predicate_index),
+        selection_on_probe_index_(selection_on_probe_index),
+        selection_on_build_index_(selection_on_build_index),
+        join_type_(join_type),
+        probe_relation_block_ids_(probe_relation_is_stored
+                                      ? probe_relation.getBlocksSnapshot()
+                                      : std::vector<block_id>()),
+        num_workorders_generated_(0),
+        started_(false) {}
 
   ~HashJoinOperator() override {}
 
@@ -163,6 +184,15 @@
   }
 
  private:
+  template <class JoinWorkOrderClass>
+  bool getAllNonOuterJoinWorkOrders(WorkOrdersContainer *container,
+                                    QueryContext *query_context,
+                                    StorageManager *storage_manager);
+
+  bool getAllOuterJoinWorkOrders(WorkOrdersContainer *container,
+                                 QueryContext *query_context,
+                                 StorageManager *storage_manager);
+
   const CatalogRelation &build_relation_;
   const CatalogRelation &probe_relation_;
   const bool probe_relation_is_stored_;
@@ -172,7 +202,9 @@
   const QueryContext::insert_destination_id output_destination_index_;
   const QueryContext::join_hash_table_id hash_table_index_;
   const QueryContext::predicate_id residual_predicate_index_;
-  const QueryContext::scalar_group_id selection_index_;
+  const QueryContext::scalar_group_id selection_on_probe_index_;
+  const QueryContext::scalar_group_id selection_on_build_index_;
+  const JoinType join_type_;
 
   std::vector<block_id> probe_relation_block_ids_;
   std::size_t num_workorders_generated_;
@@ -183,9 +215,9 @@
 };
 
 /**
- * @brief A WorkOrder produced by HashJoinOperator.
+ * @brief An inner join WorkOrder produced by HashJoinOperator.
  **/
-class HashJoinWorkOrder : public WorkOrder {
+class HashInnerJoinWorkOrder : public WorkOrder {
  public:
   /**
    * @brief Constructor.
@@ -198,26 +230,26 @@
    *        probe_relation.
    * @param any_join_key_attributes_nullable If any attribute is nullable.
    * @param lookup_block_id The block id of the probe_relation.
+   * @param selection A list of Scalars corresponding to the relation attributes
+   *        in \c output_destination. Each Scalar is evaluated for the joined
+   *        tuples, and the resulting value is inserted into the join result.
+   * @param hash_table The JoinHashTable to use.
    * @param residual_predicate If non-null, apply as an additional filter to
    *        pairs of tuples that match the hash-join (i.e. key equality)
    *        predicate. Effectively, this makes the join predicate the
    *        conjunction of the key-equality predicate and residual_predicate.
-   * @param selection A list of Scalars corresponding to the relation attributes
-   *        in \c output_destination. Each Scalar is evaluated for the joined
-   *        tuples, and the resulting value is inserted into the join result.
    * @param output_destination The InsertDestination to insert the join results.
-   * @param hash_table The JoinHashTable to use.
    * @param storage_manager The StorageManager to use.
    **/
-  HashJoinWorkOrder(const CatalogRelationSchema &build_relation,
+  HashInnerJoinWorkOrder(const CatalogRelationSchema &build_relation,
                     const CatalogRelationSchema &probe_relation,
                     const std::vector<attribute_id> &join_key_attributes,
                     const bool any_join_key_attributes_nullable,
                     const block_id lookup_block_id,
-                    const Predicate *residual_predicate,
                     const std::vector<std::unique_ptr<const Scalar>> &selection,
+                    const JoinHashTable &hash_table,
+                    const Predicate *residual_predicate,
                     InsertDestination *output_destination,
-                    JoinHashTable *hash_table,
                     StorageManager *storage_manager)
       : build_relation_(build_relation),
         probe_relation_(probe_relation),
@@ -226,11 +258,11 @@
         block_id_(lookup_block_id),
         residual_predicate_(residual_predicate),
         selection_(selection),
+        hash_table_(hash_table),
         output_destination_(DCHECK_NOTNULL(output_destination)),
-        hash_table_(DCHECK_NOTNULL(hash_table)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
 
-  ~HashJoinWorkOrder() override {}
+  ~HashInnerJoinWorkOrder() override {}
 
   /**
    * @exception TupleTooLargeForBlock A tuple produced by this join was too
@@ -253,11 +285,233 @@
   const Predicate *residual_predicate_;
   const std::vector<std::unique_ptr<const Scalar>> &selection_;
 
+  const JoinHashTable &hash_table_;
   InsertDestination *output_destination_;
-  JoinHashTable *hash_table_;
   StorageManager *storage_manager_;
 
-  DISALLOW_COPY_AND_ASSIGN(HashJoinWorkOrder);
+  DISALLOW_COPY_AND_ASSIGN(HashInnerJoinWorkOrder);
+};
+
+/**
+ * @brief A left semi-join WorkOrder produced by the HashJoinOperator to execute
+ *        EXISTS() clause.
+ **/
+class HashSemiJoinWorkOrder : public WorkOrder {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param build_relation The relation that the hash table was originally built
+   *        on (i.e. the inner relation in the join).
+   * @param probe_relation The relation to probe the hash table with (i.e. the
+   *        outer relation in the join).
+   * @param join_key_attributes The IDs of equijoin attributes in \c
+   *        probe_relation.
+   * @param any_join_key_attributes_nullable If any attribute is nullable.
+   * @param lookup_block_id The block id of the probe_relation.
+   * @param selection A list of Scalars corresponding to the relation attributes
+   *        in \c output_destination. Each Scalar is evaluated for the joined
+   *        tuples, and the resulting value is inserted into the join result.
+   * @param hash_table The JoinHashTable to use.
+   * @param residual_predicate If non-null, apply as an additional filter to
+   *        pairs of tuples that match the hash-join (i.e. key equality)
+   *        predicate. Effectively, this makes the join predicate the
+   *        conjunction of the key-equality predicate and residual_predicate.
+   * @param output_destination The InsertDestination to insert the join results.
+   * @param storage_manager The StorageManager to use.
+   **/
+  HashSemiJoinWorkOrder(const CatalogRelationSchema &build_relation,
+                        const CatalogRelationSchema &probe_relation,
+                        const std::vector<attribute_id> &join_key_attributes,
+                        const bool any_join_key_attributes_nullable,
+                        const block_id lookup_block_id,
+                        const std::vector<std::unique_ptr<const Scalar>> &selection,
+                        const JoinHashTable &hash_table,
+                        const Predicate *residual_predicate,
+                        InsertDestination *output_destination,
+                        StorageManager *storage_manager)
+     :  build_relation_(build_relation),
+        probe_relation_(probe_relation),
+        join_key_attributes_(join_key_attributes),
+        any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+        hash_table_(hash_table),
+        selection_(selection),
+        residual_predicate_(residual_predicate),
+        block_id_(lookup_block_id),
+        output_destination_(output_destination),
+        storage_manager_(storage_manager) {}
+
+  ~HashSemiJoinWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  void executeWithoutResidualPredicate();
+
+  void executeWithResidualPredicate();
+
+  const CatalogRelationSchema &build_relation_;
+  const CatalogRelationSchema &probe_relation_;
+  const std::vector<attribute_id> join_key_attributes_;
+  const bool any_join_key_attributes_nullable_;
+  const JoinHashTable &hash_table_;
+  const std::vector<std::unique_ptr<const Scalar>> &selection_;
+  const Predicate *residual_predicate_;
+  const block_id block_id_;
+
+  InsertDestination *output_destination_;
+  StorageManager *storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(HashSemiJoinWorkOrder);
+};
+
+/**
+ * @brief A left anti-join WorkOrder produced by the HashJoinOperator to execute
+ *        NOT EXISTS() clause.
+ **/
+class HashAntiJoinWorkOrder : public WorkOrder {
+ public:
+  /**
+   * @brief Constructor.
+   * TODO(harshad) - Sync the doxygen.
+   *
+   * @param build_relation The relation that the hash table was originally built
+   *        on (i.e. the inner relation in the join).
+   * @param probe_relation The relation to probe the hash table with (i.e. the
+   *        outer relation in the join).
+   * @param join_key_attributes The IDs of equijoin attributes in \c
+   *        probe_relation.
+   * @param any_join_key_attributes_nullable If any attribute is nullable.
+   * @param lookup_block_id The block id of the probe_relation.
+   * @param selection A list of Scalars corresponding to the relation attributes
+   *        in \c output_destination. Each Scalar is evaluated for the joined
+   *        tuples, and the resulting value is inserted into the join result.
+   * @param hash_table The JoinHashTable to use.
+   * @param residual_predicate If non-null, apply as an additional filter to
+   *        pairs of tuples that match the hash-join (i.e. key equality)
+   *        predicate. Effectively, this makes the join predicate the
+   *        conjunction of the key-equality predicate and residual_predicate.
+   * @param output_destination The InsertDestination to insert the join results.
+   * @param storage_manager The StorageManager to use.
+   **/
+  HashAntiJoinWorkOrder(const CatalogRelationSchema &build_relation,
+                        const CatalogRelationSchema &probe_relation,
+                        const std::vector<attribute_id> &join_key_attributes,
+                        const bool any_join_key_attributes_nullable,
+                        const block_id lookup_block_id,
+                        const std::vector<std::unique_ptr<const Scalar>> &selection,
+                        const JoinHashTable &hash_table,
+                        const Predicate *residual_predicate,
+                        InsertDestination *output_destination,
+                        StorageManager *storage_manager)
+      : build_relation_(build_relation),
+        probe_relation_(probe_relation),
+        join_key_attributes_(join_key_attributes),
+        any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+        selection_(selection),
+        residual_predicate_(residual_predicate),
+        block_id_(lookup_block_id),
+        hash_table_(hash_table),
+        output_destination_(output_destination),
+        storage_manager_(storage_manager) {}
+
+  ~HashAntiJoinWorkOrder() override {}
+
+  void execute() override {
+    if (residual_predicate_ == nullptr) {
+      executeWithoutResidualPredicate();
+    } else {
+      executeWithResidualPredicate();
+    }
+  }
+
+ private:
+  void executeWithoutResidualPredicate();
+
+  void executeWithResidualPredicate();
+
+  const CatalogRelationSchema &build_relation_;
+  const CatalogRelationSchema &probe_relation_;
+  const std::vector<attribute_id> join_key_attributes_;
+  const bool any_join_key_attributes_nullable_;
+  const std::vector<std::unique_ptr<const Scalar>> &selection_;
+  const Predicate *residual_predicate_;
+  const block_id block_id_;
+  const JoinHashTable &hash_table_;
+
+  InsertDestination *output_destination_;
+  StorageManager *storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(HashAntiJoinWorkOrder);
+};
+
+/**
+ * @brief A left outer join WorkOrder produced by the HashJoinOperator.
+ **/
+ class HashOuterJoinWorkOrder : public WorkOrder {
+ public:
+  /**
+   * @brief Constructor.
+   *
+   * @param build_relation The relation that the hash table was originally built
+   *        on (i.e. the inner relation in the join).
+   * @param probe_relation The relation to probe the hash table with (i.e. the
+   *        outer relation in the join).
+   * @param join_key_attributes The IDs of equijoin attributes in \c
+   *        probe_relation.
+   * @param any_join_key_attributes_nullable If any attribute is nullable.
+   * @param hash_table The JoinHashTable to use.
+   * @param selection_on_probe A list of Scalars from probe relation,
+   *        corresponding to the relation attributes in \c output_destination.
+   * @param selection_on_build A list of Scalars from build relation,
+   *        corresponding to the relation attributes in \c output_destination.
+   * @param lookup_block_id The block id of the probe_relation.
+   * @param output_destination The InsertDestination to insert the join results.
+   * @param storage_manager The StorageManager to use.
+   **/
+  HashOuterJoinWorkOrder(const CatalogRelationSchema &build_relation,
+                         const CatalogRelationSchema &probe_relation,
+                         const std::vector<attribute_id> &join_key_attributes,
+                         const bool any_join_key_attributes_nullable,
+                         const JoinHashTable &hash_table,
+                         const std::vector<std::unique_ptr<const Scalar>> &selection_on_probe,
+                         const std::vector<std::unique_ptr<const Scalar>> &selection_on_build,
+                         const std::vector<const Type*> &selection_on_build_types,
+                         const block_id lookup_block_id,
+                         InsertDestination *output_destination,
+                         StorageManager *storage_manager)
+      : build_relation_(build_relation),
+        probe_relation_(probe_relation),
+        join_key_attributes_(join_key_attributes),
+        any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+        hash_table_(hash_table),
+        selection_on_probe_(selection_on_probe),
+        selection_on_build_(selection_on_build),
+        selection_on_build_types_(selection_on_build_types),
+        block_id_(lookup_block_id),
+        output_destination_(output_destination),
+        storage_manager_(storage_manager) {}
+
+  ~HashOuterJoinWorkOrder() override {}
+
+  void execute() override;
+
+ private:
+  const CatalogRelationSchema &build_relation_;
+  const CatalogRelationSchema &probe_relation_;
+  const std::vector<attribute_id> join_key_attributes_;
+  const bool any_join_key_attributes_nullable_;
+  const JoinHashTable &hash_table_;
+  const std::vector<std::unique_ptr<const Scalar>> &selection_on_probe_;
+  const std::vector<std::unique_ptr<const Scalar>> &selection_on_build_;
+  const std::vector<const Type*> &selection_on_build_types_;
+
+  const block_id block_id_;
+
+  InsertDestination *output_destination_;
+  StorageManager *storage_manager_;
+
+  DISALLOW_COPY_AND_ASSIGN(HashOuterJoinWorkOrder);
 };
 
 /** @} */
diff --git a/storage/HashTable.hpp b/storage/HashTable.hpp
index ab74e2c..0178970 100644
--- a/storage/HashTable.hpp
+++ b/storage/HashTable.hpp
@@ -707,6 +707,89 @@
                                FunctorT *functor) const;
 
   /**
+   * @brief Lookup (multiple) keys from a ValueAccessor, apply a functor to
+   *        the matching values and additionally call a hasMatch() function of
+   *        the functor when the first match for a key is found.
+   * @warning This method assumes that no concurrent calls to put(),
+   *          putCompositeKey(), putValueAccessor(),
+   *          putValueAccessorCompositeKey(), upsert(), upsertCompositeKey(),
+   *          upsertValueAccessor(), or upsertValueAccessorCompositeKey() are
+   *          taking place (i.e. that this HashTable is immutable for the
+   *          duration of the call and as long as the returned pointer may be
+   *          dereferenced). Concurrent calls to getSingle(),
+   *          getSingleCompositeKey(), getAll(), getAllCompositeKey(),
+   *          getAllFromValueAccessor(), getAllFromValueAccessorCompositeKey(),
+   *          forEach(), and forEachCompositeKey() are safe.
+   * @note This version is for single scalar keys. See also
+   *       getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch().
+   *
+   * @param accessor A ValueAccessor which will be used to access keys.
+   *        beginIteration() should be called on accessor before calling this
+   *        method.
+   * @param key_attr_id The attribute ID of the keys to be read from accessor.
+   * @param check_for_null_keys If true, each key will be checked to see if it
+   *        is null before looking it up (null keys are skipped). This must be
+   *        set to true if some of the keys that will be read from accessor may
+   *        be null.
+   * @param functor A pointer to a functor, which should provide two functions:
+   *        1) An operator that takes 2 arguments: const ValueAccessor& (or better
+   *        yet, a templated call operator which takes a const reference to
+   *        some subclass of ValueAccessor as its first argument) and
+   *        const ValueT&. The operator will be invoked once for each pair of a
+   *        key taken from accessor and matching value.
+   *        2) A function hasMatch that takes 1 argument: const ValueAccessor&.
+   *        The function will be called only once for a key from accessor when
+   *        the first match is found.
+   */
+  template <typename FunctorT>
+  void getAllFromValueAccessorWithExtraWorkForFirstMatch(
+      ValueAccessor *accessor,
+      const attribute_id key_attr_id,
+      const bool check_for_null_keys,
+      FunctorT *functor) const;
+
+  /**
+   * @brief Lookup (multiple) keys from a ValueAccessor, apply a functor to
+   *        the matching values and additionally call a hasMatch() function of
+   *        the functor when the first match for a key is found. Composite key
+   *        version.
+   * @warning This method assumes that no concurrent calls to put(),
+   *          putCompositeKey(), putValueAccessor(),
+   *          putValueAccessorCompositeKey(), upsert(), upsertCompositeKey(),
+   *          upsertValueAccessor(), or upsertValueAccessorCompositeKey() are
+   *          taking place (i.e. that this HashTable is immutable for the
+   *          duration of the call and as long as the returned pointer may be
+   *          dereferenced). Concurrent calls to getSingle(),
+   *          getSingleCompositeKey(), getAll(), getAllCompositeKey(),
+   *          getAllFromValueAccessor(), getAllFromValueAccessorCompositeKey(),
+   *          forEach(), and forEachCompositeKey() are safe.
+   *
+   * @param accessor A ValueAccessor which will be used to access keys.
+   *        beginIteration() should be called on accessor before calling this
+   *        method.
+   * @param key_attr_id The attribute ID of the keys to be read from accessor.
+   * @param check_for_null_keys If true, each key will be checked to see if it
+   *        is null before looking it up (null keys are skipped). This must be
+   *        set to true if some of the keys that will be read from accessor may
+   *        be null.
+   * @param functor A pointer to a functor, which should provide two functions:
+   *        1) An operator that takes 2 arguments: const ValueAccessor& (or better
+   *        yet, a templated call operator which takes a const reference to
+   *        some subclass of ValueAccessor as its first argument) and
+   *        const ValueT&. The operator will be invoked once for each pair of a
+   *        key taken from accessor and matching value.
+   *        2) A function hasMatch that takes 1 argument: const ValueAccessor&.
+   *        The function will be called only once for a key from accessor when
+   *        the first match is found.
+   */
+  template <typename FunctorT>
+  void getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch(
+      ValueAccessor *accessor,
+      const std::vector<attribute_id> &key_attr_ids,
+      const bool check_for_null_keys,
+      FunctorT *functor) const;
+
+  /**
    * @brief Lookup (multiple) keys from a ValueAccessor and apply a functor to
    *        the matching values. Composite key version.
    *
@@ -746,6 +829,113 @@
                                            FunctorT *functor) const;
 
   /**
+   * @brief Apply the functor to each key with a match in the hash table.
+   *
+   * @param accessor A ValueAccessor which will be used to access keys.
+   *        beginIteration() should be called on accessor before calling this
+   *        method.
+   * @param key_attr_id The attribute ID of the keys to be read from accessor.
+   * @param check_for_null_keys If true, each key will be checked to see if it
+   *        is null before looking it up (null keys are skipped). This must be
+   *        set to true if some of the keys that will be read from accessor may
+   *        be null.
+   * @param functor A pointer to a functor which should provide an operator that
+   *        takes 1 argument: const ValueAccessor&. The operator will be called
+   *        only once for a key from accessor if there is a match.
+   */
+  template <typename FunctorT>
+  void runOverKeysFromValueAccessorIfMatchFound(ValueAccessor *accessor,
+                                                const attribute_id key_attr_id,
+                                                const bool check_for_null_keys,
+                                                FunctorT *functor) const {
+    return runOverKeysFromValueAccessor<true>(accessor,
+                                              key_attr_id,
+                                              check_for_null_keys,
+                                              functor);
+  }
+
+  /**
+   * @brief Apply the functor to each key with a match in the hash table.
+   *
+   * @param accessor A ValueAccessor which will be used to access keys.
+   *        beginIteration() should be called on accessor before calling this
+   *        method.
+   * @param key_attr_id The attribute ID of the keys to be read from accessor.
+   * @param check_for_null_keys If true, each key will be checked to see if it
+   *        is null before looking it up (null keys are skipped). This must be
+   *        set to true if some of the keys that will be read from accessor may
+   *        be null.
+   * @param functor A pointer to a functor which should provide an operator that
+   *        takes 1 argument: const ValueAccessor&. The operator will be called
+   *        only once for a key from accessor if there is a match.
+   */
+  template <typename FunctorT>
+  void runOverKeysFromValueAccessorIfMatchFoundCompositeKey(
+      ValueAccessor *accessor,
+      const std::vector<attribute_id> &key_attr_ids,
+      const bool check_for_null_keys,
+      FunctorT *functor) const {
+    return runOverKeysFromValueAccessorCompositeKey<true>(accessor,
+                                                          key_attr_ids,
+                                                          check_for_null_keys,
+                                                          functor);
+  }
+
+  /**
+   * @brief Apply the functor to each key without a match in the hash table.
+   *
+   * @param accessor A ValueAccessor which will be used to access keys.
+   *        beginIteration() should be called on accessor before calling this
+   *        method.
+   * @param key_attr_id The attribute ID of the keys to be read from accessor.
+   * @param check_for_null_keys If true, each key will be checked to see if it
+   *        is null before looking it up (null keys are skipped). This must be
+   *        set to true if some of the keys that will be read from accessor may
+   *        be null.
+   * @param functor A pointer to a functor which should provide an operator that
+   *        takes 1 argument: const ValueAccessor&. The operator will be called
+   *        only once for a key from accessor if there is no match.
+   */
+  template <typename FunctorT>
+  void runOverKeysFromValueAccessorIfMatchNotFound(
+      ValueAccessor *accessor,
+      const attribute_id key_attr_id,
+      const bool check_for_null_keys,
+      FunctorT *functor) const {
+    return runOverKeysFromValueAccessor<false>(accessor,
+                                               key_attr_id,
+                                               check_for_null_keys,
+                                               functor);
+  }
+
+  /**
+   * @brief Apply the functor to each key without a match in the hash table.
+   *
+   * @param accessor A ValueAccessor which will be used to access keys.
+   *        beginIteration() should be called on accessor before calling this
+   *        method.
+   * @param key_attr_id The attribute ID of the keys to be read from accessor.
+   * @param check_for_null_keys If true, each key will be checked to see if it
+   *        is null before looking it up (null keys are skipped). This must be
+   *        set to true if some of the keys that will be read from accessor may
+   *        be null.
+   * @param functor A pointer to a functor which should provide an operator that
+   *        takes 1 argument: const ValueAccessor&. The operator will be called
+   *        only once for a key from accessor if there is no match.
+   */
+  template <typename FunctorT>
+  void runOverKeysFromValueAccessorIfMatchNotFoundCompositeKey(
+      ValueAccessor *accessor,
+      const std::vector<attribute_id> &key_attr_ids,
+      const bool check_for_null_keys,
+      FunctorT *functor) const {
+    return runOverKeysFromValueAccessorCompositeKey<false>(accessor,
+                                                           key_attr_ids,
+                                                           check_for_null_keys,
+                                                           functor);
+  }
+
+  /**
    * @brief Apply a functor to each key, value pair in this hash table.
    *
    * @warning This method assumes that no concurrent calls to put(),
@@ -965,6 +1155,10 @@
                                            const ValueT **value,
                                            std::size_t *entry_num) const = 0;
 
+  // Return true if key exists in the hash table.
+  virtual bool hasKey(const TypedValue &key) const = 0;
+  virtual bool hasCompositeKey(const std::vector<TypedValue> &key) const = 0;
+
   // For a resizable HashTable, grow to accomodate more entries. If
   // 'extra_buckets' is not zero, it may serve as a "hint" to implementations
   // that at least the requested number of extra buckets are required when
@@ -1048,6 +1242,21 @@
     return false;
   }
 
+  // If run_if_match_found is true, apply the functor to each key if a match is
+  // found; otherwise, apply the functor if no match is found.
+  template <bool run_if_match_found, typename FunctorT>
+  void runOverKeysFromValueAccessor(ValueAccessor *accessor,
+                                    const attribute_id key_attr_id,
+                                    const bool check_for_null_keys,
+                                    FunctorT *functor) const;
+
+  template <bool run_if_match_found, typename FunctorT>
+  void runOverKeysFromValueAccessorCompositeKey(
+      ValueAccessor *accessor,
+      const std::vector<attribute_id> &key_attr_ids,
+      const bool check_for_null_keys,
+      FunctorT *functor) const;
+
   // Method containing the actual logic implementing getAllFromValueAccessor().
   // Has extra template parameters that control behavior to avoid some
   // inner-loop branching.
@@ -1678,6 +1887,184 @@
           bool force_key_copy,
           bool allow_duplicate_keys>
 template <typename FunctorT>
+void HashTable<ValueT,
+               resizable,
+               serializable,
+               force_key_copy,
+               allow_duplicate_keys>::
+    getAllFromValueAccessorWithExtraWorkForFirstMatch(
+        ValueAccessor *accessor,
+        const attribute_id key_attr_id,
+        const bool check_for_null_keys,
+        FunctorT *functor) const {
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    while (accessor->next()) {
+      TypedValue key = accessor->getTypedValue(key_attr_id);
+      if (check_for_null_keys && key.isNull()) {
+        continue;
+      }
+      const std::size_t hash_code = adjust_hashes_ ? AdjustHash(key.getHash())
+                                                   : key.getHash();
+      std::size_t entry_num = 0;
+      const ValueT *value;
+      if (getNextEntryForKey(key, hash_code, &value, &entry_num)) {
+        functor->recordMatch(*accessor);
+        (*functor)(*accessor, *value);
+        if (!allow_duplicate_keys) {
+           continue;
+        }
+        while (getNextEntryForKey(key, hash_code, &value, &entry_num)) {
+          (*functor)(*accessor, *value);
+        }
+      }
+    }
+  });  // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+template <typename FunctorT>
+void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+    ::getAllFromValueAccessorCompositeKeyWithExtraWorkForFirstMatch(
+        ValueAccessor *accessor,
+        const std::vector<attribute_id> &key_attr_ids,
+        const bool check_for_null_keys,
+        FunctorT *functor) const {
+  DEBUG_ASSERT(key_types_.size() == key_attr_ids.size());
+  std::vector<TypedValue> key_vector;
+  key_vector.resize(key_attr_ids.size());
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    while (accessor->next()) {
+      bool null_key = false;
+      for (std::vector<attribute_id>::size_type key_idx = 0;
+           key_idx < key_types_.size();
+           ++key_idx) {
+        key_vector[key_idx] = accessor->getTypedValue(key_attr_ids[key_idx]);
+        if (check_for_null_keys && key_vector[key_idx].isNull()) {
+          null_key = true;
+          break;
+        }
+      }
+      if (null_key) {
+        continue;
+      }
+
+      const std::size_t hash_code = adjust_hashes_ ? AdjustHash(hashCompositeKey(key_vector))
+                                                   : hashCompositeKey(key_vector);
+      std::size_t entry_num = 0;
+      const ValueT *value;
+      if (getNextEntryForCompositeKey(key_vector, hash_code, &value, &entry_num)) {
+        functor->recordMatch(*accessor);
+        (*functor)(*accessor, *value);
+        if (!allow_duplicate_keys) {
+          continue;
+        }
+        while (getNextEntryForCompositeKey(key_vector, hash_code, &value, &entry_num)) {
+          (*functor)(*accessor, *value);
+        }
+      }
+    }
+  });  // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+template <bool run_if_match_found, typename FunctorT>
+void HashTable<ValueT,
+               resizable,
+               serializable,
+               force_key_copy,
+               allow_duplicate_keys>::
+    runOverKeysFromValueAccessor(ValueAccessor *accessor,
+                                 const attribute_id key_attr_id,
+                                 const bool check_for_null_keys,
+                                 FunctorT *functor) const {
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    while (accessor->next()) {
+      TypedValue key = accessor->getTypedValue(key_attr_id);
+      if (check_for_null_keys && key.isNull()) {
+        if (!run_if_match_found) {
+          (*functor)(*accessor);
+          continue;
+        }
+      }
+      if (run_if_match_found) {
+        if (hasKey(key)) {
+          (*functor)(*accessor);
+        }
+      } else {
+        if (!hasKey(key)) {
+          (*functor)(*accessor);
+        }
+      }
+    }
+  });  // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+template <bool run_if_match_found, typename FunctorT>
+void HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+    ::runOverKeysFromValueAccessorCompositeKey(ValueAccessor *accessor,
+                                               const std::vector<attribute_id> &key_attr_ids,
+                                               const bool check_for_null_keys,
+                                               FunctorT *functor) const {
+  DEBUG_ASSERT(key_types_.size() == key_attr_ids.size());
+  std::vector<TypedValue> key_vector;
+  key_vector.resize(key_attr_ids.size());
+  InvokeOnAnyValueAccessor(
+      accessor,
+      [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+    while (accessor->next()) {
+      bool null_key = false;
+      for (std::vector<attribute_id>::size_type key_idx = 0;
+           key_idx < key_types_.size();
+           ++key_idx) {
+        key_vector[key_idx] = accessor->getTypedValue(key_attr_ids[key_idx]);
+        if (check_for_null_keys && key_vector[key_idx].isNull()) {
+          null_key = true;
+          break;
+        }
+      }
+      if (null_key) {
+        if (!run_if_match_found) {
+          (*functor)(*accessor);
+          continue;
+        }
+      }
+
+      if (run_if_match_found) {
+        if (hasCompositeKey(key_vector)) {
+          (*functor)(*accessor);
+        }
+      } else if (!hasCompositeKey(key_vector)) {
+        (*functor)(*accessor);
+      }
+    }
+  });  // NOLINT(whitespace/parens)
+}
+
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+template <typename FunctorT>
 std::size_t HashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
     ::forEach(FunctorT *functor) const {
   std::size_t entries_visited = 0;
diff --git a/storage/LinearOpenAddressingHashTable.hpp b/storage/LinearOpenAddressingHashTable.hpp
index e5ca0b0..a2ebe18 100644
--- a/storage/LinearOpenAddressingHashTable.hpp
+++ b/storage/LinearOpenAddressingHashTable.hpp
@@ -151,6 +151,9 @@
                                    const ValueT **value,
                                    std::size_t *entry_num) const override;
 
+  bool hasKey(const TypedValue &key) const override;
+  bool hasCompositeKey(const std::vector<TypedValue> &key) const override;
+
   void resize(const std::size_t extra_buckets,
               const std::size_t extra_variable_storage,
               const std::size_t retry_num = 0) override;
@@ -1099,6 +1102,75 @@
   return false;
 }
 
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+bool LinearOpenAddressingHashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+    ::hasKey(const TypedValue &key) const {
+  DEBUG_ASSERT(this->key_types_.size() == 1);
+  DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+  const std::size_t hash_code = this->AdjustHash(key.getHash());
+  for (std::size_t bucket_num = hash_code % header_->num_buckets;
+       bucket_num < header_->num_buckets + header_->num_overflow_buckets;
+       ++bucket_num) {
+    const char *bucket = static_cast<const char*>(hash_buckets_) + bucket_num * bucket_size_;
+    const std::size_t bucket_hash
+        = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+    if (bucket_hash == kEmptyHash) {
+      // Hit an empty bucket, so the search is finished
+      // without finding any match.
+      return false;
+    }
+
+    // None of the get methods should be called while inserts are still taking
+    // place.
+    DEBUG_ASSERT(bucket_hash != kPendingHash);
+
+    if ((bucket_hash == hash_code) && key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      return true;
+    }
+  }
+  return false;
+}
+
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+bool LinearOpenAddressingHashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+    ::hasCompositeKey(const std::vector<TypedValue> &key) const {
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  const std::size_t hash_code = this->AdjustHash(this->hashCompositeKey(key));
+  for (std::size_t bucket_num = hash_code % header_->num_buckets;
+       bucket_num < header_->num_buckets + header_->num_overflow_buckets;
+       ++bucket_num) {
+    const char *bucket = static_cast<const char*>(hash_buckets_) + bucket_num * bucket_size_;
+    const std::size_t bucket_hash
+        = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+    if (bucket_hash == kEmptyHash) {
+      // Hit an empty bucket, so the search is finished
+      // without finding any match.
+      return false;
+    }
+
+    // None of the get methods should be called while inserts are still taking
+    // place.
+    DEBUG_ASSERT(bucket_hash != kPendingHash);
+
+    if ((bucket_hash == hash_code) && key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Match located.
+      return true;
+    }
+  }
+  return false;
+}
+
 // TODO(chasseur): Smarter heuristics that are more selective about whether
 // to grow hash buckets, variable-length storage, or both, and to what degree.
 template <typename ValueT,
diff --git a/storage/SeparateChainingHashTable.hpp b/storage/SeparateChainingHashTable.hpp
index c93e783..c096b1b 100644
--- a/storage/SeparateChainingHashTable.hpp
+++ b/storage/SeparateChainingHashTable.hpp
@@ -145,6 +145,9 @@
                                    const ValueT **value,
                                    std::size_t *entry_num) const override;
 
+  bool hasKey(const TypedValue &key) const override;
+  bool hasCompositeKey(const std::vector<TypedValue> &key) const override;
+
   void resize(const std::size_t extra_buckets,
               const std::size_t extra_variable_storage,
               const std::size_t retry_num = 0) override;
@@ -1054,6 +1057,57 @@
           bool serializable,
           bool force_key_copy,
           bool allow_duplicate_keys>
+bool SeparateChainingHashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+    ::hasKey(const TypedValue &key) const {
+  DEBUG_ASSERT(this->key_types_.size() == 1);
+  DEBUG_ASSERT(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+  const std::size_t hash_code = key.getHash();
+  std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) * bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) && key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+      // Find a match.
+      return true;
+    }
+    bucket_ref = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+  }
+  return false;
+}
+
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
+bool SeparateChainingHashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
+    ::hasCompositeKey(const std::vector<TypedValue> &key) const {
+  DEBUG_ASSERT(this->key_types_.size() == key.size());
+
+  const std::size_t hash_code = this->hashCompositeKey(key);
+  std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
+    const char *bucket = static_cast<const char*>(buckets_) + (bucket_ref - 1) * bucket_size_;
+    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t*>(
+        bucket + sizeof(std::atomic<std::size_t>));
+    if ((bucket_hash == hash_code) && key_manager_.compositeKeyCollisionCheck(key, bucket)) {
+      // Find a match.
+      return true;
+    }
+    bucket_ref = reinterpret_cast<const std::atomic<std::size_t>*>(bucket)->load(std::memory_order_relaxed);
+  }
+  return false;
+}
+
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
 void SeparateChainingHashTable<ValueT, resizable, serializable, force_key_copy, allow_duplicate_keys>
     ::resize(const std::size_t extra_buckets,
              const std::size_t extra_variable_storage,
diff --git a/storage/SimpleScalarSeparateChainingHashTable.hpp b/storage/SimpleScalarSeparateChainingHashTable.hpp
index b2d894d..1619e34 100644
--- a/storage/SimpleScalarSeparateChainingHashTable.hpp
+++ b/storage/SimpleScalarSeparateChainingHashTable.hpp
@@ -182,6 +182,12 @@
     return getNextEntryForKey(key.front(), hash_code, value, entry_num);
   }
 
+  bool hasKey(const TypedValue &key) const override;
+
+  bool hasCompositeKey(const std::vector<TypedValue> &key) const override {
+    return false;
+  }
+
   void resize(const std::size_t extra_buckets,
               const std::size_t extra_variable_storage,
               const std::size_t retry_num = 0) override;
@@ -772,6 +778,37 @@
           bool serializable,
           bool force_key_copy,
           bool allow_duplicate_keys>
+bool SimpleScalarSeparateChainingHashTable<ValueT,
+                                      resizable,
+                                      serializable,
+                                      force_key_copy,
+                                      allow_duplicate_keys>
+    ::hasKey(const TypedValue &key) const {
+  DCHECK(!allow_duplicate_keys);
+  DCHECK(key.isPlausibleInstanceOf(this->key_types_.front()->getSignature()));
+
+  const std::size_t hash_code = key.getHashScalarLiteral();
+  std::size_t bucket_ref = slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
+  while (bucket_ref != 0) {
+    DCHECK_NE(bucket_ref, std::numeric_limits<std::size_t>::max());
+
+    const Bucket &bucket = buckets_[bucket_ref - 1];
+    if (bucket.hash == hash_code) {
+      // Match located.
+      return true;
+    }
+    bucket_ref = bucket.next.load(std::memory_order_relaxed);
+  }
+
+  // Reached the end of the chain and didn't find a match.
+  return false;
+}
+
+template <typename ValueT,
+          bool resizable,
+          bool serializable,
+          bool force_key_copy,
+          bool allow_duplicate_keys>
 void SimpleScalarSeparateChainingHashTable<ValueT,
                                            resizable,
                                            serializable,
diff --git a/storage/StorageBlock.hpp b/storage/StorageBlock.hpp
index da3bc70..97813e2 100644
--- a/storage/StorageBlock.hpp
+++ b/storage/StorageBlock.hpp
@@ -187,6 +187,17 @@
   }
 
   /**
+   * @brief Get the flag vector indicating for each IndexSubBlock
+   *        whether it is consistent.
+   *
+   * @return The flag vector indicating for each IndexSubBlock
+   *         whether it is consistent.
+   */
+  const std::vector<bool>& getIndicesConsistent() const {
+    return indices_consistent_;
+  }
+
+  /**
    * @brief Get one of this block's IndexSubBlocks.
    *
    * @param index_id The ID of the IndexSubBlock. This is simply a serial
@@ -201,6 +212,15 @@
   }
 
   /**
+   * @brief Get the IndexSubBlock vector.
+   *
+   * @return The IndexSubBlock vector.
+   */
+  const PtrVector<IndexSubBlock>& getIndices() const {
+    return indices_;
+  }
+
+  /**
    * @brief Insert a single tuple into this block.
    *
    * @param tuple The tuple to insert.