IMPALA-9126: part 4: hash join builder manages spilling

This is the final patch for IMPALA-9126.

This will allow the many:1 relationship of probe:build
partitions that we need for the shared join build.

Key changes:
* Builder picks the next spilled partition to process.
* Partitions are identified by unique ID so can be
  decoupled between build and probe.
* unique_ptr is used to manage build partitions. This
  helps document the lifecycle of the partitions better,
  particularly when they are handed off to
  PartitionedHashJoinNode.

Testing:
* Ran exhaustive tests.
* Ran a single node TPC-H and TPC-DS stress test with 1000 queries.

Perf:
Ran a single node TPC-H 30 test against master from
before IMPALA-9126 changes. No significant perf
change.

Change-Id: I6de5f62e3eacf80f72c8ea0ed8cba012f0f53c90
Reviewed-on: http://gerrit.cloudera.org:8080/14790
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc
index 8d8e42d..a4bfcdb 100644
--- a/be/src/exec/partitioned-hash-join-builder-ir.cc
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -61,7 +61,7 @@
     }
     const uint32_t hash = expr_vals_cache->CurExprValuesHash();
     const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
-    Partition* partition = hash_partitions_[partition_idx];
+    Partition* partition = hash_partitions_[partition_idx].get();
     if (UNLIKELY(!AppendRow(partition->build_rows(), build_row, &status))) {
       return status;
     }
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 8c1023a..a865e8f 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -187,13 +187,13 @@
 
 Status PhjBuilder::FlushFinal(RuntimeState* state) {
   int64_t num_build_rows = 0;
-  for (Partition* partition : hash_partitions_) {
+  for (const unique_ptr<Partition>& partition : hash_partitions_) {
     num_build_rows += partition->build_rows()->num_rows();
   }
 
   if (num_build_rows > 0) {
     double largest_fraction = 0.0;
-    for (Partition* partition : hash_partitions_) {
+    for (const unique_ptr<Partition>& partition : hash_partitions_) {
       largest_fraction = max(largest_fraction,
           partition->build_rows()->num_rows() / static_cast<double>(num_build_rows));
     }
@@ -205,7 +205,7 @@
     ss << Substitute("PHJ(node_id=$0) partitioned(level=$1) $2 rows into:", join_node_id_,
         hash_partitions_[0]->level(), num_build_rows);
     for (int i = 0; i < hash_partitions_.size(); ++i) {
-      Partition* partition = hash_partitions_[i];
+      Partition* partition = hash_partitions_[i].get();
       double percent = num_build_rows == 0 ? 0.0 : partition->build_rows()->num_rows()
               * 100 / static_cast<double>(num_build_rows);
       ss << "  " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled")
@@ -260,6 +260,7 @@
   state_ = HashJoinState::PARTITIONING_BUILD;
   expr_results_pool_->Clear();
   non_empty_build_ = false;
+  next_partition_id_ = 0;
   CloseAndDeletePartitions(row_batch);
 }
 
@@ -303,24 +304,30 @@
   return "";
 }
 
-Status PhjBuilder::CreateAndPreparePartition(int level, Partition** partition) {
-  all_partitions_.emplace_back(new Partition(runtime_state_, this, level));
-  *partition = all_partitions_.back().get();
-  RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_label_, true));
+Status PhjBuilder::CreateAndPreparePartition(
+    int level, unique_ptr<Partition>* partition) {
+  *partition = make_unique<Partition>(runtime_state_, this, level);
+  Status status = (*partition)->build_rows()->Init(join_node_label_, true);
+  if (!status.ok()) goto error;
   bool got_buffer;
-  RETURN_IF_ERROR((*partition)->build_rows()->PrepareForWrite(&got_buffer));
+  status = (*partition)->build_rows()->PrepareForWrite(&got_buffer);
+  if (!status.ok()) goto error;
   DCHECK(got_buffer)
       << "Accounted in min reservation" << buffer_pool_client_->DebugString();
   return Status::OK();
+ error:
+  (*partition)->Close(nullptr);
+  partition->reset();
+  return status;
 }
 
 Status PhjBuilder::CreateHashPartitions(int level) {
   DCHECK(hash_partitions_.empty());
   ht_ctx_->set_level(level); // Set the hash function for partitioning input.
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    Partition* new_partition;
+    unique_ptr<Partition> new_partition;
     RETURN_IF_ERROR(CreateAndPreparePartition(level, &new_partition));
-    hash_partitions_.push_back(new_partition);
+    hash_partitions_.push_back(std::move(new_partition));
   }
   COUNTER_ADD(partitions_created_, PARTITION_FANOUT);
   COUNTER_SET(max_partition_level_, level);
@@ -348,11 +355,11 @@
   Partition* best_candidate = nullptr;
   if (null_aware_partition_ != nullptr && null_aware_partition_->CanSpill()) {
     // Spill null-aware partition first if possible - it is always processed last.
-    best_candidate = null_aware_partition_;
+    best_candidate = null_aware_partition_.get();
   } else {
     // Iterate over the partitions and pick the largest partition to spill.
     int64_t max_freed_mem = 0;
-    for (Partition* candidate : hash_partitions_) {
+    for (const unique_ptr<Partition>& candidate : hash_partitions_) {
       if (!candidate->CanSpill()) continue;
       int64_t mem = candidate->build_rows()->BytesPinned(false);
       if (candidate->hash_tbl() != nullptr) {
@@ -363,7 +370,7 @@
       }
       if (mem > max_freed_mem) {
         max_freed_mem = mem;
-        best_candidate = candidate;
+        best_candidate = candidate.get();
       }
     }
   }
@@ -400,7 +407,7 @@
   DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size());
 
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    Partition* partition = hash_partitions_[i];
+    Partition* partition = hash_partitions_[i].get();
     if (partition->build_rows()->num_rows() == 0) {
       // This partition is empty, no need to do anything else.
       partition->Close(nullptr);
@@ -424,7 +431,7 @@
   RETURN_IF_ERROR(ReserveProbeBuffers(input_is_spilled));
 
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    Partition* partition = hash_partitions_[i];
+    Partition* partition = hash_partitions_[i].get();
     if (partition->IsClosed() || partition->is_spilled()) continue;
 
     bool built = false;
@@ -458,7 +465,7 @@
     RETURN_IF_ERROR(SpillPartition(
           BufferedTupleStream::UNPIN_ALL, &spilled_partition));
     // Don't need to create a probe stream for the null-aware partition.
-    if (spilled_partition != null_aware_partition_) {
+    if (spilled_partition != null_aware_partition_.get()) {
       addtl_reservation += per_stream_reservation;
     }
   }
@@ -487,33 +494,44 @@
   probe_client->RestoreReservation(&probe_stream_reservation_, saved_reservation);
 }
 
-int PhjBuilder::GetNumSpilledPartitions(const vector<Partition*>& partitions) {
+int PhjBuilder::GetNumSpilledPartitions(const vector<unique_ptr<Partition>>& partitions) {
   int num_spilled = 0;
   for (int i = 0; i < partitions.size(); ++i) {
-    Partition* partition = partitions[i];
+    Partition* partition = partitions[i].get();
     DCHECK(partition != nullptr);
     if (!partition->IsClosed() && partition->is_spilled()) ++num_spilled;
   }
   return num_spilled;
 }
 
-void PhjBuilder::DoneProbingHashPartitions(const bool retain_partition[PARTITION_FANOUT],
-    list<Partition*>* output_partitions, RowBatch* batch) {
+void PhjBuilder::DoneProbingHashPartitions(
+    const int64_t num_spilled_probe_rows[PARTITION_FANOUT],
+    deque<unique_ptr<Partition>>* output_partitions, RowBatch* batch) {
   DCHECK(output_partitions->empty());
+  if (state_ == HashJoinState::REPARTITIONING_PROBE) {
+    // Finished repartitioning this partition. Discard before pushing more spilled
+    // partitions onto 'spilled_partitions_'.
+    DCHECK(!spilled_partitions_.empty());
+    spilled_partitions_.pop_back();
+  }
+
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    PhjBuilder::Partition* partition = hash_partitions_[i];
+    unique_ptr<PhjBuilder::Partition> partition = std::move(hash_partitions_[i]);
     if (partition->IsClosed()) continue;
+    partition->IncrementNumSpilledProbeRows(num_spilled_probe_rows[i]);
     if (partition->is_spilled()) {
       DCHECK(partition->hash_tbl() == nullptr) << DebugString();
       DCHECK_EQ(partition->build_rows()->BytesPinned(false), 0)
           << "Build was fully unpinned in BuildHashTablesAndPrepareProbeStreams()";
-      // Release resources associated with completed partitions.
-      if (!retain_partition[i]) {
+      if (partition->num_spilled_probe_rows() == 0
+          && !NeedToProcessUnmatchedBuildRows(join_op_)) {
         COUNTER_ADD(num_hash_table_builds_skipped_, 1);
         partition->Close(nullptr);
+      } else {
+        spilled_partitions_.push_back(std::move(partition));
       }
     } else if (NeedToProcessUnmatchedBuildRows(join_op_)) {
-      output_partitions->push_back(partition);
+      output_partitions->push_back(std::move(partition));
     } else {
       // No more processing is required for this partition.
       partition->Close(batch);
@@ -523,24 +541,30 @@
 }
 
 void PhjBuilder::DoneProbingSinglePartition(
-    Partition* partition, std::list<Partition*>* output_partitions, RowBatch* batch) {
+      deque<unique_ptr<Partition>>* output_partitions, RowBatch* batch) {
   if (NeedToProcessUnmatchedBuildRows(join_op_)) {
     // If the build partition was in memory, we are done probing this partition.
     // In case of right-outer, right-anti and full-outer joins, we move this partition
     // to the list of partitions that we need to output their unmatched build rows.
-    output_partitions->push_back(partition);
+    output_partitions->push_back(std::move(spilled_partitions_.back()));
   } else {
     // In any other case, just close the input build partition.
-    partition->Close(IsLeftSemiJoin(join_op_) ? nullptr : batch);
+    spilled_partitions_.back()->Close(IsLeftSemiJoin(join_op_) ? nullptr : batch);
   }
+  spilled_partitions_.pop_back();
 }
 
 void PhjBuilder::CloseAndDeletePartitions(RowBatch* row_batch) {
   // Close all the partitions and clean up all references to them.
-  for (unique_ptr<Partition>& partition : all_partitions_) partition->Close(row_batch);
-  all_partitions_.clear();
+  for (unique_ptr<Partition>& partition : hash_partitions_) {
+    partition->Close(row_batch);
+  }
   hash_partitions_.clear();
-  null_aware_partition_ = nullptr;
+  for (unique_ptr<Partition>& partition : spilled_partitions_) {
+    partition->Close(row_batch);
+  }
+  spilled_partitions_.clear();
+  CloseNullAwarePartition();
 }
 
 void PhjBuilder::AllocateRuntimeFilters() {
@@ -609,13 +633,20 @@
   }
 }
 
-Status PhjBuilder::BeginSpilledProbe(bool empty_probe, Partition* partition,
-    BufferPool::ClientHandle* probe_client, bool* repartitioned, int* level,
-    HashPartitions* new_partitions) {
-  DCHECK(partition->is_spilled());
+Status PhjBuilder::BeginSpilledProbe(
+    BufferPool::ClientHandle* probe_client, bool* repartitioned,
+     Partition** input_partition, HashPartitions* new_partitions) {
+  DCHECK(!spilled_partitions_.empty());
   DCHECK_EQ(0, hash_partitions_.size());
+  // Pick the next spilled partition to process. The partition will stay in
+  // 'spilled_partitions_' until we are done probing it or repartitioning its probe.
+  // Thus it will remain valid as long as it's needed and always get cleaned up in
+  // Close(), even if an error occurs.
+  Partition* partition = spilled_partitions_.back().get();
+  *input_partition = partition;
+  DCHECK(partition->is_spilled()) << partition->DebugString();
 
-  if (empty_probe) {
+  if (partition->num_spilled_probe_rows() == 0) {
     // If there are no probe rows, there's no need to build the hash table, and
     // only partitions with NeedToProcessUnmatcheBuildRows() will have been added
     // to 'spilled_partitions_' in DoneProbingHashPartitions().
@@ -629,7 +660,6 @@
     COUNTER_ADD(num_hash_table_builds_skipped_, 1);
     UpdateState(HashJoinState::PROBING_SPILLED_PARTITION);
     *repartitioned = false;
-    *level = partition->level();
     return Status::OK();
   }
 
@@ -645,7 +675,6 @@
     TransferProbeStreamReservation(probe_client);
     UpdateState(HashJoinState::PROBING_SPILLED_PARTITION);
     *repartitioned = false;
-    *level = partition->level();
     return Status::OK();
   }
   // This build partition still does not fit in memory, repartition.
@@ -679,7 +708,6 @@
   }
   TransferProbeStreamReservation(probe_client);
   *repartitioned = true;
-  *level = ht_ctx_->level();
   *new_partitions = HashPartitions(ht_ctx_->level(), &hash_partitions_, non_empty_build_);
   return Status::OK();
 }
@@ -723,7 +751,7 @@
 int64_t PhjBuilder::LargestPartitionRows() const {
   int64_t max_rows = 0;
   for (int i = 0; i < hash_partitions_.size(); ++i) {
-    Partition* partition = hash_partitions_[i];
+    Partition* partition = hash_partitions_[i].get();
     DCHECK(partition != nullptr);
     if (partition->IsClosed()) continue;
     int64_t rows = partition->build_rows()->num_rows();
@@ -740,7 +768,10 @@
 }
 
 PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int level)
-  : parent_(parent), is_spilled_(false), level_(level) {
+  : parent_(parent),
+    id_(parent->next_partition_id_++),
+    is_spilled_(false),
+    level_(level) {
   build_rows_ = make_unique<BufferedTupleStream>(state, parent_->row_desc_,
       parent_->buffer_pool_client_, parent->spillable_buffer_size_,
       parent->max_row_buffer_size_);
@@ -875,7 +906,7 @@
 
 std::string PhjBuilder::Partition::DebugString() {
   stringstream ss;
-  ss << "<Partition>: ptr=" << this;
+  ss << "<Partition>: ptr=" << this << " id=" << id_;
   if (IsClosed()) {
     ss << " Closed";
     return ss.str();
@@ -891,6 +922,7 @@
   if (hash_tbl_ != nullptr) {
     ss << "    Hash Table Rows: " << hash_tbl_->size();
   }
+  ss << "    Spilled Probe Rows: " << num_spilled_probe_rows_ << endl;
   return ss.str();
 }
 
@@ -935,6 +967,11 @@
   for (int i = 0; i < hash_partitions_.size(); ++i) {
     ss << " Hash partition " << i << " " << hash_partitions_[i]->DebugString() << endl;
   }
+  ss << " Spilled partitions: " << spilled_partitions_.size() << ":" << endl;
+  for (int i = 0; i < spilled_partitions_.size(); ++i) {
+    ss << " Spilled partition " << i << " "
+       << spilled_partitions_[i]->DebugString() << endl;
+  }
   if (null_aware_partition_ != nullptr) {
     ss << "Null-aware partition: " << null_aware_partition_->DebugString();
   }
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 6e614e8..9ac23b1 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -18,10 +18,10 @@
 #ifndef IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H
 #define IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H
 
-#include <boost/scoped_ptr.hpp>
+#include <deque>
 #include <memory>
-#include <list>
 #include <vector>
+#include <boost/scoped_ptr.hpp>
 
 #include "common/object-pool.h"
 #include "common/status.h"
@@ -112,6 +112,8 @@
 
   class Partition;
 
+  using PartitionId = int;
+
   PhjBuilder(int join_node_id, const std::string& join_node_label, TJoinOp::type join_op,
       const RowDescriptor* build_row_desc, RuntimeState* state,
       BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
@@ -143,8 +145,9 @@
   /// Represents a set of hash partitions to be handed off to the probe side.
   struct HashPartitions {
     HashPartitions() { Reset(); }
-    HashPartitions(
-        int level, const std::vector<Partition*>* hash_partitions, bool non_empty_build)
+    HashPartitions(int level,
+        const std::vector<std::unique_ptr<Partition>>* hash_partitions,
+        bool non_empty_build)
       : level(level),
         hash_partitions(hash_partitions),
         non_empty_build(non_empty_build) {}
@@ -162,7 +165,7 @@
     // The current set of hash partitions. Always contains PARTITION_FANOUT partitions.
     // The partitions may be in-memory, spilled, or closed. Valid until
     // DoneProbingHashPartitions() is called.
-    const std::vector<Partition*>* hash_partitions;
+    const std::vector<std::unique_ptr<Partition>>* hash_partitions;
 
     // True iff the build side had at least one row in a partition.
     bool non_empty_build;
@@ -176,9 +179,10 @@
   /// TODO: IMPALA-9156: this will be a synchronization point for shared join build.
   HashPartitions BeginInitialProbe(BufferPool::ClientHandle* probe_client);
 
-  /// Prepare to process the probe side of 'partition', either by building a hash
-  /// table over 'partition', or if does not fit in memory, by repartitioning into
-  /// PARTITION_FANOUT new partitions.
+  /// Pick a spilled partition to process (returned in *input_partition) and
+  /// prepare to probe it. Builds a hash table over *input_partition
+  /// if it fits in memory. Otherwise repartition it into PARTITION_FANOUT
+  /// new partitions.
   ///
   /// When this function returns successfully, 'probe_client' will have enough
   /// reservation for a read buffer for the input probe stream and, if repartitioning,
@@ -189,24 +193,24 @@
   /// previous hash partitions must have been cleared with DoneProbingHashPartitions().
   /// The new hash partitions are returned in 'new_partitions'.
   /// TODO: IMPALA-9156: this will be a synchronization point for shared join build.
-  Status BeginSpilledProbe(bool empty_probe, Partition* partition,
-      BufferPool::ClientHandle* probe_client, bool* repartitioned, int* level,
-      HashPartitions* new_partitions);
+  Status BeginSpilledProbe(BufferPool::ClientHandle* probe_client, bool* repartitioned,
+      Partition** input_partition, HashPartitions* new_partitions);
 
   /// Called after probing of the hash partitions returned by BeginInitialProbe() or
-  /// BeginSpilledProbe() (when *repartitioning as true) is complete,
-  /// i.e. all of the corresponding probe rows have been processed by
-  /// PartitionedHashJoinNode. Appends in-memory partitions that may contain build
-  /// rows to output to 'output_partitions' for build modes like right outer join
-  /// that output unmatched rows. Close other in-memory partitions, attaching any
-  /// tuple data to 'batch' if 'batch' is non-NULL. Closes spilled partitions if
-  /// 'retain_spilled_partition' is false for that partition index.
+  /// BeginSpilledProbe() (when *repartitioned as true) is complete, i.e. all of the
+  /// corresponding probe rows have been processed by PartitionedHashJoinNode. The number
+  /// of spilled probe rows per partition must be passed in via 'num_spilled_probe_rows'
+  /// so that the builder can determine whether a spilled partition needs to be retained.
+  /// Appends in-memory partitions that may contain build rows to output to
+  /// 'output_partitions' for build modes like right outer join that output unmatched
+  /// rows. Close other in-memory partitions, attaching any tuple data to 'batch' if
+  /// 'batch' is non-NULL. Closes spilled partitions if no more processing is needed.
   /// TODO: IMPALA-9156: this will be a synchronization point for shared join build.
-  void DoneProbingHashPartitions(const bool retain_spilled_partition[PARTITION_FANOUT],
-      std::list<Partition*>* output_partitions, RowBatch* batch);
+  void DoneProbingHashPartitions(const int64_t num_spilled_probe_rows[PARTITION_FANOUT],
+      std::deque<std::unique_ptr<Partition>>* output_partitions, RowBatch* batch);
 
   /// Called after probing of a single spilled partition returned by
-  /// BeginSpilledProbe() when *repartitioning is false.
+  /// BeginSpilledProbe() when *repartitioned is false.
   ///
   /// If the join op requires outputting unmatched build rows and the partition
   /// may have build rows to return, it is appended to 'output_partitions'. Partitions
@@ -218,7 +222,7 @@
   /// tuple data to 'batch' if 'batch' is non-NULL.
   /// TODO: IMPALA-9156: this will be a synchronization point for shared join build.
   void DoneProbingSinglePartition(
-      Partition* partition, std::list<Partition*>* output_partitions, RowBatch* batch);
+      std::deque<std::unique_ptr<Partition>>* output_partitions, RowBatch* batch);
 
   /// Close the null aware partition (if there is one) and set it to NULL.
   /// TODO: IMPALA-9176: improve the encapsulation of the null-aware partition.
@@ -228,7 +232,7 @@
       // from the probe side - i.e. the RowDescriptor for PartitionedHashJoinNode does
       // not include the build tuple.
       null_aware_partition_->Close(nullptr);
-      null_aware_partition_ = nullptr;
+      null_aware_partition_.reset();
     }
   }
 
@@ -250,7 +254,7 @@
 
   /// Accessor to allow PartitionedHashJoinNode to access null_aware_partition_.
   /// TODO: IMPALA-9176: improve the encapsulation of the null-aware partition.
-  inline Partition* null_aware_partition() const { return null_aware_partition_; }
+  inline Partition* null_aware_partition() const { return null_aware_partition_.get(); }
 
   std::string DebugString() const;
 
@@ -304,10 +308,13 @@
     bool ALWAYS_INLINE IsClosed() const { return build_rows_ == nullptr; }
     BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); }
     HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); }
+    PartitionId id() const { return id_; }
     bool ALWAYS_INLINE is_spilled() const { return is_spilled_; }
     int ALWAYS_INLINE level() const { return level_; }
     /// Return true if the partition can be spilled - is not closed and is not spilled.
     bool CanSpill() const { return !IsClosed() && !is_spilled(); }
+    int64_t num_spilled_probe_rows() const { return num_spilled_probe_rows_; }
+    void IncrementNumSpilledProbeRows(int64_t count) { num_spilled_probe_rows_ += count; }
 
    private:
     /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'flat_rows' is an array
@@ -325,6 +332,9 @@
 
     const PhjBuilder* parent_;
 
+    /// Id for this partition that is unique within the builder.
+    const PartitionId id_;
+
     /// True if this partition is spilled.
     bool is_spilled_;
 
@@ -340,6 +350,10 @@
     /// transferred to the parent exec node (via the row batch) when the partition
     /// is closed. If NULL, ownership has been transferred and the partition is closed.
     std::unique_ptr<BufferedTupleStream> build_rows_;
+
+    /// The number of spilled probe rows associated with this partition. Updated in
+    /// DoneProbingHashPartitions().
+    int64_t num_spilled_probe_rows_ = 0;
   };
 
   /// Computes the minimum reservation required to execute the spilling partitioned
@@ -374,8 +388,9 @@
   /// After calling this, batches are added to the new partitions by calling Send().
   Status CreateHashPartitions(int level) WARN_UNUSED_RESULT;
 
-  /// Create a new partition in 'all_partitions_' and prepare it for writing.
-  Status CreateAndPreparePartition(int level, Partition** partition) WARN_UNUSED_RESULT;
+  /// Create a new partition and prepare it for writing. Returns an error if initializing
+  /// the partition or allocating the write buffer fails.
+  Status CreateAndPreparePartition(int level, std::unique_ptr<Partition>* partition);
 
   /// Reads the rows in build_batch and partitions them into hash_partitions_. If
   /// 'build_filters' is true, runtime filters are populated. 'is_null_aware' is
@@ -432,7 +447,8 @@
   Status ReserveProbeBuffers(bool input_is_spilled) WARN_UNUSED_RESULT;
 
   /// Returns the number of partitions in 'partitions' that are spilled.
-  static int GetNumSpilledPartitions(const std::vector<Partition*>& partitions);
+  static int GetNumSpilledPartitions(
+      const std::vector<std::unique_ptr<Partition>>& partitions);
 
   /// Transfer reservation for probe streams to 'probe_client'. Memory for one stream was
   /// reserved per spilled partition in FlushFinal(), plus the input stream if the input
@@ -488,8 +504,6 @@
   RuntimeState* const runtime_state_;
 
   /// The ID of the plan join node this is associated with.
-  /// TODO: we may want to replace this with a sink ID once we progress further with
-  /// multithreading.
   const int join_node_id_;
 
   /// The label of the plan join node this is associated with.
@@ -583,23 +597,34 @@
   /// Set in FlushFinal() and not modified until Reset().
   bool non_empty_build_ = false;
 
-  /// Vector that owns all of the Partition objects.
-  std::vector<std::unique_ptr<Partition>> all_partitions_;
+  /// Id to assign to the next partition created.
+  PartitionId next_partition_id_ = 0;
 
   /// The current set of partitions that are being built or probed. This vector is
   /// initialized before partitioning or re-partitioning the build input
   /// and cleared after we've finished probing the partitions.
   /// This is not used when processing a single spilled partition.
-  std::vector<Partition*> hash_partitions_;
+  std::vector<std::unique_ptr<Partition>> hash_partitions_;
+
+  /// Spilled partitions that need further processing. Populated in
+  /// DoneProbingHashPartitions() with the spilled hash partitions.
+  ///
+  /// This is used as a stack to do a depth-first walk of spilled partitions (i.e. more
+  /// finely partitioned partitions are processed first). This allows us to delete spilled
+  /// data and bottom out the recursion earlier.
+  ///
+  /// spilled_partitions_.back() is the spilled partition being processed, if one is
+  /// currently being processed (i.e. between BeginSpilledProbe() and the corresponding
+  /// DoneProbing*() call).
+  std::vector<std::unique_ptr<PhjBuilder::Partition>> spilled_partitions_;
 
   /// Partition used for null-aware joins. This partition is always processed at the end
   /// after all build and probe rows are processed. In this partition's 'build_rows_', we
   /// store all the rows for which 'build_expr_evals_' evaluated over the row returns
   /// NULL (i.e. it has a NULL on the eq join slot).
   /// NULL if the join is not null aware or we are done processing this partition.
-  /// Always NULL once we are done processing the level 0 partitions.
   /// This partition starts off in memory but can be spilled.
-  Partition* null_aware_partition_ = nullptr;
+  std::unique_ptr<Partition> null_aware_partition_;
 
   /// Populated during the hash table building phase if any partitions spilled.
   /// Reservation for one probe stream write buffer per spilled partition is
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index 5038189..81052d1 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -304,7 +304,7 @@
       } else {
         // The build partition is either empty or spilled.
         PhjBuilder::Partition* build_partition =
-            (*build_hash_partitions_.hash_partitions)[partition_idx];
+            (*build_hash_partitions_.hash_partitions)[partition_idx].get();
         ProbePartition* probe_partition = probe_hash_partitions_[partition_idx].get();
         DCHECK((build_partition->IsClosed() && probe_partition == NULL)
             || (build_partition->is_spilled() && probe_partition != NULL));
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index cd66e8c..763111d 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -215,9 +215,7 @@
     if (partition != NULL) partition->Close(row_batch);
   }
   probe_hash_partitions_.clear();
-  for (unique_ptr<ProbePartition>& partition : spilled_partitions_) {
-    partition->Close(row_batch);
-  }
+  for (auto& entry : spilled_partitions_) entry.second->Close(row_batch);
   spilled_partitions_.clear();
   if (input_partition_ != NULL) {
     input_partition_->Close(row_batch);
@@ -227,7 +225,7 @@
     null_aware_probe_partition_->Close(row_batch);
     null_aware_probe_partition_.reset();
   }
-  for (PhjBuilder::Partition* partition : output_build_partitions_) {
+  for (unique_ptr<PhjBuilder::Partition>& partition : output_build_partitions_) {
     partition->Close(row_batch);
   }
   output_build_partitions_.clear();
@@ -376,21 +374,26 @@
   DCHECK(probe_hash_partitions_.empty());
   DCHECK(!spilled_partitions_.empty());
 
-  // TODO: the builder should choose the spilled partition to process.
-  input_partition_ = std::move(spilled_partitions_.front());
-  spilled_partitions_.pop_front();
-  PhjBuilder::Partition* build_partition = input_partition_->build_partition();
-  DCHECK(build_partition->is_spilled());
+  PhjBuilder::Partition* build_input_partition;
+  bool repartitioned;
+  RETURN_IF_ERROR(builder_->BeginSpilledProbe(buffer_pool_client(), &repartitioned,
+      &build_input_partition, &build_hash_partitions_));
+
+  auto it = spilled_partitions_.find(build_input_partition->id());
+  DCHECK(it != spilled_partitions_.end())
+      << "All spilled build partitions must have a corresponding probe partition";
+  input_partition_ = std::move(it->second);
+  spilled_partitions_.erase(it);
+  DCHECK_EQ(build_input_partition, input_partition_->build_partition());
   DCHECK_EQ(input_partition_->probe_rows()->BytesPinned(false), 0) << NodeDebugString();
 
-  bool empty_probe = input_partition_->probe_rows()->num_rows() == 0;
-  bool repartitioned;
-  int level;
-  RETURN_IF_ERROR(builder_->BeginSpilledProbe(empty_probe, build_partition,
-      buffer_pool_client(), &repartitioned, &level, &build_hash_partitions_));
-
-  ht_ctx_->set_level(level);
-  if (empty_probe) {
+  ht_ctx_->set_level(build_input_partition->level() + (repartitioned ? 1 : 0));
+  if (!repartitioned && build_input_partition->hash_tbl() == nullptr) {
+    // Build skipped the hash table build, which can only happen if there are no probe
+    // rows.
+    DCHECK_EQ(0, input_partition_->probe_rows()->num_rows())
+        << build_input_partition->DebugString() << endl
+        << input_partition_->probe_rows()->DebugString();
     return Status::OK();
   } else if (repartitioned) {
     RETURN_IF_ERROR(PrepareForPartitionedProbe());
@@ -951,7 +954,8 @@
 
   // Validate the state of the partitions.
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    PhjBuilder::Partition* build_partition = (*build_hash_partitions_.hash_partitions)[i];
+    PhjBuilder::Partition* build_partition =
+        (*build_hash_partitions_.hash_partitions)[i].get();
     ProbePartition* probe_partition = probe_hash_partitions_[i].get();
     if (build_partition->IsClosed()) {
       DCHECK(hash_tbls_[i] == NULL);
@@ -973,7 +977,8 @@
   *have_spilled_hash_partitions = false;
   probe_hash_partitions_.resize(PARTITION_FANOUT);
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    PhjBuilder::Partition* build_partition = (*build_hash_partitions_.hash_partitions)[i];
+    PhjBuilder::Partition* build_partition =
+        (*build_hash_partitions_.hash_partitions)[i].get();
     if (build_partition->IsClosed() || !build_partition->is_spilled()) continue;
     *have_spilled_hash_partitions = true;
     DCHECK(probe_hash_partitions_[i] == nullptr);
@@ -1080,43 +1085,37 @@
     // Need to clean up single in-memory build partition instead of hash partitions.
     DCHECK(build_hash_partitions_.hash_partitions == nullptr);
     DCHECK(input_partition_ != nullptr);
-    builder_->DoneProbingSinglePartition(input_partition_->build_partition(),
+    builder_->DoneProbingSinglePartition(
         &output_build_partitions_, IsLeftSemiJoin(join_op_) ? nullptr : batch);
   } else {
-    // Walk the partitions that had hash tables built for the probe phase and close them.
-    // In the case of right outer and full outer joins, instead of closing those
-    // partitions, add them to the list of partitions that need to output any unmatched
-    // build rows. This partition will be closed by the function that actually outputs
-    // unmatched build rows.
+    // Walk the partitions that had hash tables built for the probe phase and either
+    // close them or move them to 'spilled_partitions_'.
     DCHECK_EQ(build_hash_partitions_.hash_partitions->size(), PARTITION_FANOUT);
     DCHECK_EQ(probe_hash_partitions_.size(), PARTITION_FANOUT);
-    // The build partitions we need to retain for further processing.
-    bool retain_spilled_partition[PARTITION_FANOUT] = {false};
+    int64_t num_spilled_probe_rows[PARTITION_FANOUT] = {0};
     for (int i = 0; i < PARTITION_FANOUT; ++i) {
       ProbePartition* probe_partition = probe_hash_partitions_[i].get();
+      PhjBuilder::Partition* build_partition =
+          (*build_hash_partitions_.hash_partitions)[i].get();
       if (probe_partition == nullptr) {
         // Partition was not spilled.
         if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
           // For NAAJ, we need to try to match the NULL probe rows with this build
           // partition before we are done with it.
-          PhjBuilder::Partition* build_partition =
-              (*build_hash_partitions_.hash_partitions)[i];
           if (!build_partition->IsClosed()) {
             RETURN_IF_ERROR(EvaluateNullProbe(state, build_partition->build_rows()));
           }
         }
       } else if (probe_partition->probe_rows()->num_rows() != 0
           || NeedToProcessUnmatchedBuildRows(join_op_)) {
-        retain_spilled_partition[i] = true;
+        num_spilled_probe_rows[i] = probe_partition->probe_rows()->num_rows();
         // Unpin the probe stream to free up more memory. We need to free all memory so we
         // can recurse the algorithm and create new hash partitions from spilled
         // partitions.
         RETURN_IF_ERROR(
             probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
-        // Push newly created partitions at the front. This means a depth first walk
-        // (more finely partitioned partitions are processed first). This allows us
-        // to delete blocks earlier and bottom out the recursion earlier.
-        spilled_partitions_.push_front(std::move(probe_hash_partitions_[i]));
+        spilled_partitions_.emplace(
+            build_partition->id(), std::move(probe_hash_partitions_[i]));
       } else {
         // There's no more processing to do for this partition, and since there were no
         // probe rows we didn't return any rows that reference memory from these
@@ -1126,7 +1125,7 @@
     }
     probe_hash_partitions_.clear();
     build_hash_partitions_.Reset();
-    builder_->DoneProbingHashPartitions(retain_spilled_partition,
+    builder_->DoneProbingHashPartitions(num_spilled_probe_rows,
         &output_build_partitions_, IsLeftSemiJoin(join_op_) ? nullptr : batch);
   }
   if (input_partition_ != nullptr) {
@@ -1135,7 +1134,7 @@
   }
   if (!output_build_partitions_.empty()) {
     DCHECK(output_unmatched_batch_iter_.get() == nullptr);
-    PhjBuilder::Partition* output_partition = output_build_partitions_.front();
+    PhjBuilder::Partition* output_partition = output_build_partitions_.front().get();
     if (output_partition->hash_tbl() != nullptr) {
       hash_tbl_iterator_ = output_partition->hash_tbl()->FirstUnmatched(ht_ctx_.get());
     } else {
@@ -1184,13 +1183,14 @@
 
   if (!spilled_partitions_.empty()) {
     ss << "SpilledPartitions" << endl;
-    for (const unique_ptr<ProbePartition>& probe_partition : spilled_partitions_) {
+    for (const auto& entry : spilled_partitions_) {
+      ProbePartition* probe_partition = entry.second.get();
       PhjBuilder::Partition* build_partition = probe_partition->build_partition();
       DCHECK(build_partition->is_spilled());
       DCHECK(build_partition->hash_tbl() == NULL);
       DCHECK(build_partition->build_rows() != NULL);
       DCHECK(probe_partition->probe_rows() != NULL);
-      ss << "  Partition=" << probe_partition.get() << endl
+      ss << "  ProbePartition (id=" << entry.first << "):" << probe_partition << endl
          << "   Spilled Build Rows: " << build_partition->build_rows()->num_rows() << endl
          << "   Spilled Probe Rows: " << probe_partition->probe_rows()->num_rows()
          << endl;
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index ba8f141..5ff8ab2 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -557,11 +557,11 @@
   /// have somewhere to spill the probe rows for the spilled partition).
   std::vector<std::unique_ptr<ProbePartition>> probe_hash_partitions_;
 
-  /// The list of probe partitions that have been spilled and still need more
-  /// processing. These partitions could need repartitioning, in which case more
-  /// partitions will be added to this list after repartitioning.
+  /// Probe partitions that have been spilled and still need more processing. Each of
+  /// these has a corresponding build partition in 'builder_' with the same PartitionId.
   /// This list is populated at DoneProbing().
-  std::list<std::unique_ptr<ProbePartition>> spilled_partitions_;
+  std::unordered_map<PhjBuilder::PartitionId, std::unique_ptr<ProbePartition>>
+      spilled_partitions_;
 
   /// The current spilled probe partition being processed as input to repartitioning,
   /// or the source of the probe rows if the hash table fits in memory.
@@ -570,7 +570,7 @@
   /// In the case of right-outer and full-outer joins, this is the list of the partitions
   /// for which we need to output their unmatched build rows. This list is populated at
   /// DoneProbing(). If this is non-empty, probe_state_ must be OUTPUTTING_UNMATCHED.
-  std::list<PhjBuilder::Partition*> output_build_partitions_;
+  std::deque<std::unique_ptr<PhjBuilder::Partition>> output_build_partitions_;
 
   /// Partition used if 'null_aware_' is set. During probing, rows from the probe
   /// side that did not have a match in the hash table are appended to this partition.