IMPALA-9126: part 2: no hash join probe structures in build

This is actually independent of part 1, and can
be merged ahead of it if needed.

This cleans a up a bit of tech debt, where the hash
join builder allocated probe-side streams. This was
implemented before we had reliable memory reservations.
Now we can simply transfer reservation.

The reason things are this way is because the separation
of PhjBuilder from PartitionedHashJoinNode (IMPALA-3567)
happened before we switched to the new BufferPool
(IMPALA-4674). It wasn't possible to reliably
transfer reservations, instead the workaround of
allocating and transferring probe streams was
necessary.

After this change, PartitionedHashJoinBuilder does
not explicitly touch any probe-side data structures.
There is still some implicit sharing of things like
the buffer pool client, which is expected as long
as the builder belongs to the ExecNode.

Testing:
Ran exhaustive tests. We should already have adequate coverage for
spilling and non-spilling hash joins.

Change-Id: I0065f7f44f44f02b7616b1f694178ca42341c42d
Reviewed-on: http://gerrit.cloudera.org:8080/14716
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 818017d..8496703 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -52,8 +52,7 @@
 const char* PhjBuilder::LLVM_CLASS_NAME = "class.impala::PhjBuilder";
 
 PhjBuilder::PhjBuilder(int join_node_id, const string& join_node_label,
-    TJoinOp::type join_op, const RowDescriptor* probe_row_desc,
-    const RowDescriptor* build_row_desc, RuntimeState* state,
+    TJoinOp::type join_op, const RowDescriptor* build_row_desc, RuntimeState* state,
     BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
     int64_t max_row_buffer_size)
   : DataSink(-1, build_row_desc,
@@ -62,7 +61,6 @@
     join_node_id_(join_node_id),
     join_node_label_(join_node_label),
     join_op_(join_op),
-    probe_row_desc_(probe_row_desc),
     buffer_pool_client_(buffer_pool_client),
     spillable_buffer_size_(spillable_buffer_size),
     max_row_buffer_size_(max_row_buffer_size),
@@ -77,6 +75,7 @@
     build_hash_table_timer_(NULL),
     repartition_timer_(NULL),
     null_aware_partition_(NULL),
+    probe_stream_reservation_(),
     process_build_batch_fn_(NULL),
     process_build_batch_fn_level0_(NULL),
     insert_batch_fn_(NULL),
@@ -151,6 +150,11 @@
 }
 
 Status PhjBuilder::Open(RuntimeState* state) {
+  // Need to init here instead of constructor so that buffer_pool_client_ is registered.
+  if (probe_stream_reservation_.is_closed()) {
+    probe_stream_reservation_.Init(buffer_pool_client_);
+  }
+
   RETURN_IF_ERROR(ht_ctx_->Open(state));
 
   for (const FilterContext& ctx : filter_ctxs_) {
@@ -239,7 +243,7 @@
     RETURN_IF_ERROR(null_aware_partition_->Spill(BufferedTupleStream::UNPIN_ALL));
   }
 
-  RETURN_IF_ERROR(BuildHashTablesAndPrepareProbeStreams());
+  RETURN_IF_ERROR(BuildHashTablesAndReserveProbeBuffers());
   return Status::OK();
 }
 
@@ -254,11 +258,13 @@
   ScalarExpr::Close(filter_exprs_);
   ScalarExpr::Close(build_exprs_);
   obj_pool_.Clear();
+  probe_stream_reservation_.Close();
   DataSink::Close(state);
   closed_ = true;
 }
 
 void PhjBuilder::Reset(RowBatch* row_batch) {
+  DCHECK_EQ(0, probe_stream_reservation_.GetReservation());
   expr_results_pool_->Clear();
   non_empty_build_ = false;
   CloseAndDeletePartitions(row_batch);
@@ -357,7 +363,7 @@
 // For now, we go with a greedy solution.
 //
 // TODO: implement the knapsack solution.
-Status PhjBuilder::BuildHashTablesAndPrepareProbeStreams() {
+Status PhjBuilder::BuildHashTablesAndReserveProbeBuffers() {
   DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size());
 
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
@@ -372,11 +378,17 @@
     }
   }
 
+  // TODO: the below logic could be improved calculating upfront how much memory is needed
+  // for each hash table, and only building hash tables that will eventually fit in
+  // memory. In some cases now we could build a hash table, then spill the partition
+  // later.
+
   // Allocate probe buffers for all partitions that are already spilled. Do this before
   // building hash tables because allocating probe buffers may cause some more partitions
   // to be spilled. This avoids wasted work on building hash tables for partitions that
   // won't fit in memory alongside the required probe buffers.
-  RETURN_IF_ERROR(InitSpilledPartitionProbeStreams());
+  bool input_is_spilled = ht_ctx_->level() > 0;
+  RETURN_IF_ERROR(ReserveProbeBuffers(input_is_spilled));
 
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
     Partition* partition = hash_partitions_[i];
@@ -389,66 +401,55 @@
     // partition (clean up the hash table, unpin build).
     if (!built) RETURN_IF_ERROR(partition->Spill(BufferedTupleStream::UNPIN_ALL));
   }
-
   // We may have spilled additional partitions while building hash tables, we need to
-  // initialize probe buffers for them.
-  // TODO: once we have reliable reservations (IMPALA-3200) this should no longer be
-  // necessary: we will know exactly how many partitions will fit in memory and we can
-  // avoid building then immediately destroying hash tables.
-  RETURN_IF_ERROR(InitSpilledPartitionProbeStreams());
-
-  // TODO: at this point we could have freed enough memory to pin and build some
-  // spilled partitions. This can happen, for example if there is a lot of skew.
-  // Partition 1: 10GB (pinned initially).
-  // Partition 2,3,4: 1GB (spilled during partitioning the build).
-  // In the previous step, we could have unpinned 10GB (because there was not enough
-  // memory to build a hash table over it) which can now free enough memory to
-  // build hash tables over the remaining 3 partitions.
-  // We start by spilling the largest partition though so the build input would have
-  // to be pretty pathological.
-  // Investigate if this is worthwhile.
+  // reserve memory for the probe buffers for those additional spilled partitions.
+  RETURN_IF_ERROR(ReserveProbeBuffers(input_is_spilled));
   return Status::OK();
 }
 
-Status PhjBuilder::InitSpilledPartitionProbeStreams() {
+Status PhjBuilder::ReserveProbeBuffers(bool input_is_spilled) {
   DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size());
 
-  int num_spilled_partitions = 0;
-  for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    Partition* partition = hash_partitions_[i];
-    if (!partition->IsClosed() && partition->is_spilled()) ++num_spilled_partitions;
-  }
-  int probe_streams_to_create =
-      num_spilled_partitions - spilled_partition_probe_streams_.size();
+  // We need a write buffer for probe rows for each spilled partition, and a read buffer
+  // if the input is a spilled partition (i.e. that we are repartitioning the input).
+  int num_probe_streams = GetNumSpilledHashPartitions() + (input_is_spilled ? 1 : 0);
+  int64_t per_stream_reservation = spillable_buffer_size_;
+  int64_t addtl_reservation = num_probe_streams * per_stream_reservation
+      - probe_stream_reservation_.GetReservation();
 
-  while (probe_streams_to_create > 0) {
-    // Create stream in vector, so that it will be cleaned up after any failure.
-    spilled_partition_probe_streams_.emplace_back(
-        make_unique<BufferedTupleStream>(runtime_state_, probe_row_desc_,
-            buffer_pool_client_, spillable_buffer_size_, max_row_buffer_size_));
-    BufferedTupleStream* probe_stream = spilled_partition_probe_streams_.back().get();
-    RETURN_IF_ERROR(probe_stream->Init(join_node_label_, false));
-
-    // Loop until either the stream gets a buffer or all partitions are spilled (in which
-    // case SpillPartition() returns an error).
-    while (true) {
-      bool got_buffer;
-      RETURN_IF_ERROR(probe_stream->PrepareForWrite(&got_buffer));
-      if (got_buffer) break;
-
-      Partition* spilled_partition;
-      RETURN_IF_ERROR(SpillPartition(
-            BufferedTupleStream::UNPIN_ALL, &spilled_partition));
-      // Don't need to create a probe stream for the null-aware partition.
-      if (spilled_partition != null_aware_partition_) ++probe_streams_to_create;
+  // Loop until either we get enough reservation or all partitions are spilled (in which
+  // case SpillPartition() returns an error).
+  while (addtl_reservation > buffer_pool_client_->GetUnusedReservation()) {
+    Partition* spilled_partition;
+    RETURN_IF_ERROR(SpillPartition(
+          BufferedTupleStream::UNPIN_ALL, &spilled_partition));
+    // Don't need to create a probe stream for the null-aware partition.
+    if (spilled_partition != null_aware_partition_) {
+      addtl_reservation += per_stream_reservation;
     }
-    --probe_streams_to_create;
   }
+  buffer_pool_client_->SaveReservation(&probe_stream_reservation_, addtl_reservation);
   return Status::OK();
 }
 
-vector<unique_ptr<BufferedTupleStream>> PhjBuilder::TransferProbeStreams() {
-  return std::move(spilled_partition_probe_streams_);
+void PhjBuilder::TransferProbeStreamReservation(BufferPool::ClientHandle* dst) {
+  int num_streams = GetNumSpilledHashPartitions();
+  int64_t saved_reservation = probe_stream_reservation_.GetReservation();
+  DCHECK_GE(saved_reservation, spillable_buffer_size_ * num_streams);
+
+  // TODO: in future we may need to support different clients for the probe.
+  DCHECK_EQ(dst, buffer_pool_client_);
+  dst->RestoreReservation(&probe_stream_reservation_, saved_reservation);
+}
+
+int PhjBuilder::GetNumSpilledHashPartitions() const {
+  int num_spilled = 0;
+  for (int i = 0; i < hash_partitions_.size(); ++i) {
+    Partition* partition = hash_partitions_[i];
+    DCHECK(partition != nullptr);
+    if (!partition->IsClosed() && partition->is_spilled()) ++num_spilled;
+  }
+  return num_spilled;
 }
 
 void PhjBuilder::DoneProbing(const bool retain_partition[PARTITION_FANOUT],
@@ -479,13 +480,6 @@
   all_partitions_.clear();
   hash_partitions_.clear();
   null_aware_partition_ = NULL;
-  for (unique_ptr<BufferedTupleStream>& stream : spilled_partition_probe_streams_) {
-    // Streams need to be cleaned up, but were never handed off to probe, so won't have
-    // any data in them.
-    DCHECK_EQ(0, stream->num_rows());
-    stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-  }
-  spilled_partition_probe_streams_.clear();
 }
 
 void PhjBuilder::AllocateRuntimeFilters() {
@@ -554,23 +548,23 @@
   }
 }
 
-Status PhjBuilder::RepartitionBuildInput(
-    Partition* input_partition, int level, BufferedTupleStream* input_probe_rows) {
-  DCHECK_GE(level, 1);
+Status PhjBuilder::RepartitionBuildInput(Partition* input_partition) {
+  int new_level = input_partition->level() + 1;
+  DCHECK_GE(new_level, 1);
   SCOPED_TIMER(repartition_timer_);
   COUNTER_ADD(num_repartitions_, 1);
   RuntimeState* state = runtime_state_;
 
   // Setup the read buffer and the new partitions.
   BufferedTupleStream* build_rows = input_partition->build_rows();
-  DCHECK(build_rows != NULL);
+  DCHECK(build_rows != nullptr);
   bool got_read_buffer;
   RETURN_IF_ERROR(build_rows->PrepareForRead(true, &got_read_buffer));
   if (!got_read_buffer) {
     return mem_tracker()->MemLimitExceeded(
         state, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, join_node_id_));
   }
-  RETURN_IF_ERROR(CreateHashPartitions(level));
+  RETURN_IF_ERROR(CreateHashPartitions(new_level));
 
   // Repartition 'input_stream' into 'hash_partitions_'.
   RowBatch build_batch(row_desc_, state->batch_size(), mem_tracker());
@@ -585,19 +579,7 @@
   }
 
   // Done reading the input, we can safely close it now to free memory.
-  input_partition->Close(NULL);
-
-  // We just freed up the buffer used for reading build rows. Ensure a buffer is
-  // allocated for reading probe rows before we build the hash tables in FlushFinal().
-  // TODO: once we have reliable reservations (IMPALA-3200) we can just hand off the
-  // reservation and avoid this complication.
-  while (true) {
-    bool got_buffer;
-    RETURN_IF_ERROR(input_probe_rows->PrepareForRead(true, &got_buffer));
-    if (got_buffer) break;
-    RETURN_IF_ERROR(SpillPartition(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT));
-  }
-
+  input_partition->Close(nullptr);
   RETURN_IF_ERROR(FlushFinal(state));
   return Status::OK();
 }
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 7e9fdf7..6397756 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -48,9 +48,9 @@
 /// The builder owns the hash tables and build row streams. The builder first does the
 /// level 0 partitioning of build rows. After FlushFinal() the builder has produced some
 /// in-memory partitions and some spilled partitions. The in-memory partitions have hash
-/// tables and the spilled partitions have a probe-side stream prepared with one write
-/// buffer, which is sufficient to spill the partition's probe rows to disk without
-/// allocating additional buffers.
+/// tables and the spilled partitions have memory reserved for a probe-side stream with
+/// one write buffer, which is sufficient to spill the partition's probe rows to disk
+/// without allocating additional buffers.
 ///
 /// After this initial partitioning, the join node probes the in-memory hash partitions.
 /// The join node then drives processing of any spilled partitions, calling
@@ -61,12 +61,9 @@
 /// Both the PartitionedHashJoinNode and the builder share a BufferPool client
 /// and the corresponding reservations. Different stages of the spilling algorithm
 /// require different mixes of build and probe buffers and hash tables, so we can
-/// share the reservation to minimize the combined memory requirement. Initial probe-side
-/// buffers are allocated in the builder then handed off to the probe side to implement
-/// this reservation sharing.
-///
-/// TODO: after we have reliable reservations (IMPALA-3200), we can simplify the handoff
-///   to the probe side by using reservations instead of preparing the streams.
+/// share the reservation to minimize the combined memory requirement. Memory for
+/// probe-side buffers is reserved in the builder then handed off to the probe side
+/// to implement this reservation sharing.
 ///
 /// The full hash join algorithm is documented in PartitionedHashJoinNode.
 class PhjBuilder : public DataSink {
@@ -91,9 +88,9 @@
   class Partition;
 
   PhjBuilder(int join_node_id, const std::string& join_node_label, TJoinOp::type join_op,
-      const RowDescriptor* probe_row_desc, const RowDescriptor* build_row_desc,
-      RuntimeState* state, BufferPool::ClientHandle* buffer_pool_client,
-      int64_t spillable_buffer_size, int64_t max_row_buffer_size);
+      const RowDescriptor* build_row_desc, RuntimeState* state,
+      BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
+      int64_t max_row_buffer_size);
 
   Status InitExprsAndFilters(RuntimeState* state,
       const std::vector<TEqJoinCondition>& eq_join_conjuncts,
@@ -118,10 +115,9 @@
   /// Reset the builder to the same state as it was in after calling Open().
   void Reset(RowBatch* row_batch);
 
-  /// Transfer ownership of the probe streams to the caller. One stream was allocated per
-  /// spilled partition in FlushFinal(). The probe streams are empty but prepared for
-  /// writing with a write buffer allocated.
-  std::vector<std::unique_ptr<BufferedTupleStream>> TransferProbeStreams();
+  /// Transfer reservation for probe streams to 'dst'. Memory for one stream was reserved
+  /// per spilled partition in FlushFinal().
+  void TransferProbeStreamReservation(BufferPool::ClientHandle* dst);
 
   /// Called after probing of the partitions is done. Appends in-memory partitions that
   /// may contain build rows to output to 'output_partitions' for build modes like
@@ -143,13 +139,12 @@
     }
   }
 
-  /// Creates new hash partitions and repartitions 'input_partition'. The previous
-  /// hash partitions must have been cleared with ClearHashPartitions().
-  /// 'level' is the level new partitions should be created with. This functions prepares
-  /// 'input_probe_rows' for reading in "delete_on_read" mode, so that the probe phase
-  /// has enough buffers preallocated to execute successfully.
-  Status RepartitionBuildInput(Partition* input_partition, int level,
-      BufferedTupleStream* input_probe_rows) WARN_UNUSED_RESULT;
+  /// Creates new hash partitions and repartitions 'input_partition' into PARTITION_FANOUT
+  /// new partitions with level input_partition->level() + 1. The previous hash partitions
+  /// must have been cleared with ClearHashPartitions(). This function reserves enough
+  /// memory for a read buffer for the input probe stream and a write buffer for each
+  /// spilled partition after repartitioning.
+  Status RepartitionBuildInput(Partition* input_partition) WARN_UNUSED_RESULT;
 
   /// Returns the largest build row count out of the current hash partitions.
   int64_t LargestPartitionRows() const;
@@ -326,25 +321,32 @@
   /// Tries to build hash tables for all unspilled hash partitions. Called after
   /// FlushFinal() when all build rows have been partitioned and added to the appropriate
   /// streams. If the hash table could not be built for a partition, the partition is
-  /// spilled (with all build blocks unpinned) and the probe stream is prepared for
-  /// writing (i.e. has an initial probe buffer allocated).
+  /// spilled (with all build blocks unpinned) and memory reservation is set aside
+  /// for a write buffer for the output probe streams, and, if this input is a spilled
+  /// partitioned, a read buffer for the input probe stream.
   ///
   /// When this function returns successfully, each partition is in one of these states:
-  /// 1. closed. No probe partition is created and the build partition is closed.
-  /// 2. in-memory. The build rows are pinned and has a hash table built. No probe
-  ///     partition is created.
+  /// 1. closed. No probe partition is created and the build partition is closed. No
+  ///       probe stream memory is reserved for this partition.
+  /// 2. in-memory. The build rows are pinned and has a hash table built. No
+  ///       probe stream memory is reserved for this partition.
   /// 3. spilled. The build rows are fully unpinned and the probe stream is prepared.
-  Status BuildHashTablesAndPrepareProbeStreams() WARN_UNUSED_RESULT;
+  ///       Memory for a probe stream write buffer is reserved for this partition.
+  Status BuildHashTablesAndReserveProbeBuffers() WARN_UNUSED_RESULT;
 
-  /// Ensures that 'spilled_partition_probe_streams_' has a stream per spilled partition
-  /// in 'hash_partitions_'. May spill additional partitions until it can create enough
-  /// probe streams with write buffers. Returns an error if an error is encountered or
-  /// if it runs out of partitions to spill.
-  Status InitSpilledPartitionProbeStreams() WARN_UNUSED_RESULT;
+  /// Ensures that 'probe_stream_reservation_' has enough reservation for a stream per
+  /// spilled partition in 'hash_partitions_', plus for the input stream if the input
+  /// is a spilled partition (indicated by input_is_spilled). May spill additional
+  /// partitions until it can free enough reservation. Returns an error if an error
+  /// is encountered or if it runs out of partitions to spill.
+  Status ReserveProbeBuffers(bool input_is_spilled) WARN_UNUSED_RESULT;
+
+  /// Returns the number of partitions in 'hash_partitions_' that are spilled.
+  int GetNumSpilledHashPartitions() const;
 
   /// Calls Close() on every Partition, deletes them, and cleans up any pointers that
-  /// may reference them. Also cleans up 'spilled_partition_probe_streams_'. If
-  /// 'row_batch' if not NULL, transfers the ownership of all row-backing resources to it.
+  /// may reference them. If 'row_batch' if not NULL, transfers the ownership of all
+  /// row-backing resources to it.
   void CloseAndDeletePartitions(RowBatch* row_batch);
 
   /// For each filter in filters_, allocate a bloom_filter from the fragment-local
@@ -391,9 +393,6 @@
   /// The join operation this is building for.
   const TJoinOp::type join_op_;
 
-  /// Descriptor for the probe rows, needed to initialize probe streams.
-  const RowDescriptor* probe_row_desc_;
-
   /// Pool for objects with same lifetime as builder.
   ObjectPool obj_pool_;
 
@@ -406,7 +405,7 @@
   /// 1:N relationship from builders to join nodes.
   BufferPool::ClientHandle* buffer_pool_client_;
 
-  /// The default and max buffer sizes to use in the build and probe streams.
+  /// The default and max buffer sizes to use in the build streams.
   const int64_t spillable_buffer_size_;
   const int64_t max_row_buffer_size_;
 
@@ -487,19 +486,21 @@
   Partition* null_aware_partition_;
 
   /// Populated during the hash table building phase if any partitions spilled.
-  /// One probe stream per spilled partition is prepared for writing so that the
-  /// initial write buffer is allocated.
+  /// Reservation for one probe stream write buffer per spilled partition is
+  /// saved to be handed off to PartitionedHashJoinNode for use in buffering
+  /// spilled probe rows.
   ///
-  /// These streams are handed off to PartitionedHashJoinNode for use in buffering
-  /// spilled probe rows. The allocation is done in the builder so that it can divide
-  /// memory between the in-memory build partitions and write buffers based on the size
-  /// of the partitions and available memory. E.g. if all the partitions fit in memory, no
-  /// write buffers need to be allocated, but if some partitions are spilled, more build
-  /// partitions may be spilled to free up memory for write buffers.
+  /// The allocation is done in the builder so that it can divide memory between the
+  /// in-memory build partitions and write buffers based on the size of the partitions
+  /// and available memory. E.g. if all the partitions fit in memory, no write buffers
+  /// need to be allocated, but if some partitions are spilled, more build partitions
+  /// may be spilled to free up memory for write buffers.
   ///
   /// Because of this, at the end of the build phase, we always have sufficient memory
   /// to execute the probe phase of the algorithm without spilling more partitions.
-  std::vector<std::unique_ptr<BufferedTupleStream>> spilled_partition_probe_streams_;
+  ///
+  /// Initialized in Open() and closed in Closed().
+  BufferPool::SubReservation probe_stream_reservation_;
 
   /// END: Members that must be Reset()
   /////////////////////////////////////////
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 880a528..293171d 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -68,7 +68,7 @@
   // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
   // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
   // being separated out further.
-  builder_.reset(new PhjBuilder(id(), label(), join_op_, child(0)->row_desc(),
+  builder_.reset(new PhjBuilder(id(), label(), join_op_,
       child(1)->row_desc(), state, buffer_pool_client(),
       resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size));
   RETURN_IF_ERROR(
@@ -256,17 +256,26 @@
 }
 
 PartitionedHashJoinNode::ProbePartition::ProbePartition(RuntimeState* state,
-    PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition,
-    unique_ptr<BufferedTupleStream> probe_rows)
+    PartitionedHashJoinNode* parent, PhjBuilder::Partition* build_partition)
   : build_partition_(build_partition),
-    probe_rows_(std::move(probe_rows)) {
-  DCHECK(probe_rows_->has_write_iterator());
-}
+    probe_rows_(make_unique<BufferedTupleStream>(state,
+      parent->child(0)->row_desc(), parent->buffer_pool_client(),
+      parent->resource_profile_.spillable_buffer_size,
+      parent->resource_profile_.max_row_buffer_size)) {}
 
 PartitionedHashJoinNode::ProbePartition::~ProbePartition() {
   DCHECK(IsClosed());
 }
 
+Status PartitionedHashJoinNode::ProbePartition::PrepareForWrite(
+    PartitionedHashJoinNode* parent, bool pinned) {
+  RETURN_IF_ERROR(probe_rows_->Init(parent->label(), pinned));
+  bool got_buffer;
+  RETURN_IF_ERROR(probe_rows_->PrepareForWrite(&got_buffer));
+  DCHECK(got_buffer) << "Should have already acquired reservation";
+  return Status::OK();
+}
+
 Status PartitionedHashJoinNode::ProbePartition::PrepareForRead() {
   bool got_read_buffer;
   RETURN_IF_ERROR(probe_rows_->PrepareForRead(true, &got_read_buffer));
@@ -444,8 +453,7 @@
     DCHECK_EQ(build_partition->build_rows()->BytesPinned(false), 0) << NodeDebugString();
     DCHECK_EQ(input_partition_->probe_rows()->BytesPinned(false), 0) << NodeDebugString();
     int64_t num_input_rows = build_partition->build_rows()->num_rows();
-    RETURN_IF_ERROR(builder_->RepartitionBuildInput(
-        build_partition, next_partition_level, input_partition_->probe_rows()));
+    RETURN_IF_ERROR(builder_->RepartitionBuildInput(build_partition));
 
     // Check if there was any reduction in the size of partitions after repartitioning.
     int64_t largest_partition_rows = builder_->LargestPartitionRows();
@@ -456,7 +464,6 @@
           next_partition_level, num_input_rows, NodeDebugString(),
           buffer_pool_client()->DebugString());
     }
-
     RETURN_IF_ERROR(PrepareForProbe());
     UpdateState(HashJoinState::REPARTITIONING_PROBE);
   } else {
@@ -867,26 +874,10 @@
 }
 
 Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
-  RuntimeState* state = runtime_state_;
-  unique_ptr<BufferedTupleStream> probe_rows =
-      make_unique<BufferedTupleStream>(state, child(0)->row_desc(), buffer_pool_client(),
-          resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size);
-  Status status = probe_rows->Init(label(), true);
-  if (!status.ok()) goto error;
-  bool got_buffer;
-  status = probe_rows->PrepareForWrite(&got_buffer);
-  if (!status.ok()) goto error;
-  DCHECK(got_buffer) << "Accounted in min reservation"
-                     << buffer_pool_client()->DebugString();
-  null_aware_probe_partition_.reset(new ProbePartition(
-      state, this, builder_->null_aware_partition(), std::move(probe_rows)));
+  null_aware_probe_partition_.reset(
+      new ProbePartition(runtime_state_, this, builder_->null_aware_partition()));
+  RETURN_IF_ERROR(null_aware_probe_partition_->PrepareForWrite(this, true));
   return Status::OK();
-
-error:
-  DCHECK(!status.ok());
-  // Ensure the temporary 'probe_rows' stream is closed correctly on error.
-  probe_rows->Close(NULL, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-  return status;
 }
 
 Status PartitionedHashJoinNode::InitNullProbeRows() {
@@ -1007,19 +998,18 @@
   DCHECK(probe_hash_partitions_.empty());
 
   // Initialize the probe partitions, providing them with probe streams.
-  vector<unique_ptr<BufferedTupleStream>> probe_streams =
-      builder_->TransferProbeStreams();
-  probe_hash_partitions_.resize(PARTITION_FANOUT);
-  bool have_spilled_hash_partitions = false;
-  for (int i = 0; i < PARTITION_FANOUT; ++i) {
-    PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
-    if (build_partition->IsClosed() || !build_partition->is_spilled()) continue;
-    have_spilled_hash_partitions = true;
-    DCHECK(!probe_streams.empty()) << "Builder should have created enough streams";
-    CreateProbePartition(i, std::move(probe_streams.back()));
-    probe_streams.pop_back();
+  vector<unique_ptr<BufferedTupleStream>> probe_streams;
+  builder_->TransferProbeStreamReservation(buffer_pool_client());
+  if (input_partition_ != nullptr) {
+    // This is a spilled partition - we need to read the probe rows. Memory was reserved
+    // in RepartitionBuildInput() for the input stream's read buffer.
+    bool got_buffer;
+    RETURN_IF_ERROR(input_partition_->probe_rows()->PrepareForRead(true, &got_buffer));
+    DCHECK(got_buffer) << "Memory should have been reserved by builder";
   }
-  DCHECK(probe_streams.empty()) << "Builder should not have created extra streams";
+
+  bool have_spilled_hash_partitions;
+  RETURN_IF_ERROR(CreateProbeHashPartitions(&have_spilled_hash_partitions));
 
   // Unpin null-aware probe streams if any partitions spilled: we don't want to waste
   // memory pinning the probe streams that might be needed to process spilled partitions.
@@ -1055,13 +1045,22 @@
   return Status::OK();
 }
 
-void PartitionedHashJoinNode::CreateProbePartition(
-    int partition_idx, unique_ptr<BufferedTupleStream> probe_rows) {
-  DCHECK_GE(partition_idx, 0);
-  DCHECK_LT(partition_idx, probe_hash_partitions_.size());
-  DCHECK(probe_hash_partitions_[partition_idx] == NULL);
-  probe_hash_partitions_[partition_idx] = make_unique<ProbePartition>(runtime_state_,
-      this, builder_->hash_partition(partition_idx), std::move(probe_rows));
+Status PartitionedHashJoinNode::CreateProbeHashPartitions(
+    bool* have_spilled_hash_partitions) {
+  *have_spilled_hash_partitions = false;
+  probe_hash_partitions_.resize(PARTITION_FANOUT);
+  for (int i = 0; i < PARTITION_FANOUT; ++i) {
+    PhjBuilder::Partition* build_partition = builder_->hash_partition(i);
+    if (build_partition->IsClosed() || !build_partition->is_spilled()) continue;
+    *have_spilled_hash_partitions = true;
+    DCHECK(probe_hash_partitions_[i] == nullptr);
+    // Put partition into vector so it will be cleaned up in CloseAndDeletePartitions()
+    // if Init() fails.
+    probe_hash_partitions_[i] =
+        make_unique<ProbePartition>(runtime_state_, this, builder_->hash_partition(i));
+    RETURN_IF_ERROR(probe_hash_partitions_[i]->PrepareForWrite(this, false));
+  }
+  return Status::OK();
 }
 
 bool PartitionedHashJoinNode::AppendProbeRowSlow(
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index 126beb1..a134d54 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -231,10 +231,11 @@
   /// After this function returns, all partitions are ready to process probe rows.
   Status PrepareForProbe() WARN_UNUSED_RESULT;
 
-  /// Creates an initialized probe partition at 'partition_idx' in
-  /// 'probe_hash_partitions_'.
-  void CreateProbePartition(
-      int partition_idx, std::unique_ptr<BufferedTupleStream> probe_rows);
+  // Initialize 'probe_hash_partitions_'. Each spilled build partition gets a
+  // corresponding probe partition. Closed or in-memory build partitions do
+  // not get a probe partition. If an error is encountered, 'probe_hash_partitions_'
+  // may be left with some partitions and will be cleaned up by Close().
+  Status CreateProbeHashPartitions(bool* have_spilled_hash_partitions);
 
   /// Append the probe row 'row' to 'stream'. The stream must be unpinned and must have
   /// a write buffer allocated, so this will succeed unless an error is encountered.
@@ -629,13 +630,16 @@
   /// disk for later processing.
   class ProbePartition {
    public:
-    /// Create a new probe partition. 'probe_rows' should be an empty unpinned stream
-    /// that has been prepared for writing with an I/O-sized write buffer.
+    /// Create a new probe partition for the same hash partition as 'build_partition'.
     ProbePartition(RuntimeState* state, PartitionedHashJoinNode* parent,
-        PhjBuilder::Partition* build_partition,
-        std::unique_ptr<BufferedTupleStream> probe_rows);
+        PhjBuilder::Partition* build_partition);
     ~ProbePartition();
 
+    /// Prepare to write the probe rows. Allocates the first write block. This stream
+    /// is unpinned so writes should not fail with out of memory if this succeeds.
+    /// Returns an error if the first write block cannot be acquired.
+    Status PrepareForWrite(PartitionedHashJoinNode* parent, bool pinned);
+
     /// Prepare to read the probe rows. Allocates the first read block, so reads will
     /// not fail with out of memory if this succeeds. Returns an error if the first read
     /// block cannot be acquired. "delete_on_read" mode is used, so the blocks backing
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 891216e..4b70f7a 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -344,12 +344,14 @@
 }
 
 void BufferPool::ClientHandle::SaveReservation(SubReservation* dst, int64_t bytes) {
+  DCHECK(!dst->is_closed());
   DCHECK_EQ(dst->tracker_->parent(), impl_->reservation());
   bool success = impl_->reservation()->TransferReservationTo(dst->tracker_.get(), bytes);
   DCHECK(success); // SubReservation should not have a limit, so this shouldn't fail.
 }
 
 void BufferPool::ClientHandle::RestoreReservation(SubReservation* src, int64_t bytes) {
+  DCHECK(!src->is_closed());
   DCHECK_EQ(src->tracker_->parent(), impl_->reservation());
   bool success = src->tracker_->TransferReservationTo(impl_->reservation(), bytes);
   DCHECK(success); // Transferring reservation to parent shouldn't fail.
@@ -363,7 +365,18 @@
   return impl_->has_unpinned_pages();
 }
 
+BufferPool::SubReservation::SubReservation() {
+  DCHECK(is_closed()) << "subreservation must be closed.";
+}
+
 BufferPool::SubReservation::SubReservation(ClientHandle* client) {
+  DCHECK(client->is_registered());
+  Init(client);
+}
+
+void BufferPool::SubReservation::Init(ClientHandle* client) {
+  DCHECK(tracker_ == nullptr);
+  DCHECK(client->is_registered());
   tracker_.reset(new ReservationTracker);
   tracker_->InitChildTracker(
       nullptr, client->impl_->reservation(), nullptr, numeric_limits<int64_t>::max());
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index b2ba24c..d7c74db 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -408,14 +408,20 @@
 /// Helper class that allows dividing up a client's reservation into separate buckets.
 class BufferPool::SubReservation {
  public:
+  // Construct without initializing this SubReservation.
+  SubReservation();
+  // Construct and initialize with 'client' as the parent.
   SubReservation(ClientHandle* client);
   ~SubReservation();
 
+  // Initialize with 'client' as the parent.
+  void Init(ClientHandle* client);
+
   /// Returns the amount of reservation stored in this sub-reservation.
   int64_t GetReservation() const;
 
   /// Releases the sub-reservation to the client's tracker. Must be called before
-  /// destruction.
+  /// destruction if this was initialized.
   void Close();
 
   bool is_closed() const { return tracker_ == nullptr; }