IMPALA-9127: explicit probe state machine in hash join

This refactors the main loop in PartitionedHashJoinNode::GetNext()
to use an explicit state machine, rather than the hard-to-follow
implicit state machine previously used. A new state variable
'probe_state_' is used to drive the loop, with DCHECKs enforcing
invariants of other member variables.

I deliberately tried to minimise changes to other functions
(including any attempts to factor logic out of GetNext())
to minimise the scope of this patch.

The new logic is mostly equivalent to the old logic, although there
may be a different number of trips through the loop because of the
way the cascading checks in the old version worked. A few notable
changes:
* DoneProbing() is consistently called when probing is finished,
  including in cases, like probing a single spilled partition, where
  it wasn't previously.
* The repeated AtCapacity() checks are consolidated into a single
  check that happens at the end of the loop. Resources attached
  to batches should still be flushed at the appropriate points,
  since each previous "if (out_batch->AtCapacity()) break;"
  corresponds to a new loop iteration in the new code.
* OutputNullAwareNullProbe() and OutputNullAwareProbeRows() now
  explicitly signal when they are done using an output argument,
  instead of implicitly via AtCapacity(), which is incredibly
  error-prone.

Testing:
We have adequate coverage for different join modes, including
with spilling.

* Ran exhaustive tests.
* Ran a single node stress test with TPC-H and TPC-DS
* Ran a single node stress test with larger scale factor

Change-Id: I32ebdf0054d2ce4562b851439e300323601fb064
Reviewed-on: http://gerrit.cloudera.org:8080/14688
Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index a2a63c1..f316928 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -365,8 +365,9 @@
 template <int const JoinOp>
 int PartitionedHashJoinNode::ProcessProbeBatch(TPrefetchMode::type prefetch_mode,
     RowBatch* out_batch, HashTableCtx* __restrict__ ht_ctx, Status* __restrict__ status) {
-  DCHECK(state_ == PARTITIONING_PROBE || state_ == PROBING_SPILLED_PARTITION
-      || state_ == REPARTITIONING_PROBE);
+  DCHECK(state_ == HashJoinState::PARTITIONING_PROBE
+      || state_ == HashJoinState::PROBING_SPILLED_PARTITION
+      || state_ == HashJoinState::REPARTITIONING_PROBE);
   ScalarExprEvaluator* const* other_join_conjunct_evals =
       other_join_conjunct_evals_.data();
   const int num_other_join_conjuncts = other_join_conjunct_evals_.size();
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index fd9703f..880a528 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -51,14 +51,7 @@
 PartitionedHashJoinNode::PartitionedHashJoinNode(
     ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
   : BlockingJoinNode(
-        "PartitionedHashJoinNode", tnode.hash_join_node.join_op, pool, tnode, descs),
-    num_probe_rows_partitioned_(NULL),
-    null_aware_eval_timer_(NULL),
-    state_(PARTITIONING_BUILD),
-    output_null_aware_probe_rows_running_(false),
-    null_probe_output_idx_(-1),
-    process_probe_batch_fn_(NULL),
-    process_probe_batch_fn_level0_(NULL) {
+        "PartitionedHashJoinNode", tnode.hash_join_node.join_op, pool, tnode, descs) {
   memset(hash_tbls_, 0, sizeof(HashTable*) * PARTITION_FANOUT);
 }
 
@@ -177,9 +170,10 @@
   RETURN_IF_ERROR(BlockingJoinNode::ProcessBuildInputAndOpenProbe(state, builder_.get()));
   RETURN_IF_ERROR(PrepareForProbe());
 
-  UpdateState(PARTITIONING_PROBE);
+  UpdateState(HashJoinState::PARTITIONING_PROBE);
   RETURN_IF_ERROR(BlockingJoinNode::GetFirstProbeRow(state));
   ResetForProbe();
+  probe_state_ = ProbeState::PROBING_IN_BATCH;
   DCHECK(null_aware_probe_partition_ == NULL
       || join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
   return Status::OK();
@@ -203,9 +197,8 @@
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     null_probe_output_idx_ = -1;
     matched_null_probe_.clear();
-    output_null_aware_probe_rows_running_ = false;
   }
-  state_ = PARTITIONING_BUILD;
+  state_ = HashJoinState::PARTITIONING_BUILD;
   ht_ctx_->set_level(0);
   CloseAndDeletePartitions(row_batch);
   builder_->Reset(IsLeftSemiJoin(join_op_) ? nullptr : row_batch);
@@ -250,7 +243,6 @@
   if (is_closed()) return;
   if (ht_ctx_ != nullptr) ht_ctx_->Close(state);
   ht_ctx_.reset();
-  output_null_aware_probe_rows_running_ = false;
   output_unmatched_batch_.reset();
   output_unmatched_batch_iter_.reset();
   CloseAndDeletePartitions(nullptr);
@@ -293,7 +285,30 @@
 
 Status PartitionedHashJoinNode::NextProbeRowBatch(
     RuntimeState* state, RowBatch* out_batch) {
-  DCHECK_EQ(state_, PARTITIONING_PROBE);
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_END_BATCH);
+  DCHECK(probe_batch_pos_ == probe_batch_->num_rows() || probe_batch_pos_ == -1);
+  if (state_ == HashJoinState::PARTITIONING_PROBE) {
+    DCHECK(input_partition_ == nullptr);
+    RETURN_IF_ERROR(NextProbeRowBatchFromChild(state, out_batch));
+  } else {
+    DCHECK(state_ == HashJoinState::REPARTITIONING_PROBE
+        || state_ == HashJoinState::PROBING_SPILLED_PARTITION)
+        << PrintState();
+    DCHECK(probe_side_eos_);
+    DCHECK(input_partition_ != nullptr);
+    RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
+  }
+  // Free expr result allocations of the probe side expressions only after
+  // ExprValuesCache has been reset.
+  DCHECK(ht_ctx_->expr_values_cache()->AtEnd());
+  probe_expr_results_pool_->Clear();
+  return Status::OK();
+}
+
+Status PartitionedHashJoinNode::NextProbeRowBatchFromChild(
+    RuntimeState* state, RowBatch* out_batch) {
+  DCHECK_ENUM_EQ(state_, HashJoinState::PARTITIONING_PROBE);
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_END_BATCH);
   DCHECK(probe_batch_pos_ == probe_batch_->num_rows() || probe_batch_pos_ == -1);
   do {
     // Loop until we find a non-empty row batch.
@@ -319,7 +334,9 @@
 Status PartitionedHashJoinNode::NextSpilledProbeRowBatch(
     RuntimeState* state, RowBatch* out_batch) {
   DCHECK(input_partition_ != NULL);
-  DCHECK(state_ == PROBING_SPILLED_PARTITION || state_ == REPARTITIONING_PROBE);
+  DCHECK(state_ == HashJoinState::PROBING_SPILLED_PARTITION
+      || state_ == HashJoinState::REPARTITIONING_PROBE);
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_END_BATCH);
   probe_batch_->TransferResourceOwnership(out_batch);
   if (out_batch->AtCapacity()) {
     // The out_batch has resources associated with it that will be recycled on the
@@ -336,7 +353,7 @@
     ResetForProbe();
   } else {
     // Finished processing spilled probe rows from this partition.
-    if (state_ == PROBING_SPILLED_PARTITION
+    if (state_ == HashJoinState::PROBING_SPILLED_PARTITION
         && NeedToProcessUnmatchedBuildRows(join_op_)) {
       // If the build partition was in memory, we are done probing this partition.
       // In case of right-outer, right-anti and full-outer joins, we move this partition
@@ -368,16 +385,12 @@
 
 // TODO: refactor this method to better separate the logic operating on the builder
 // vs probe data structures.
-Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe(
-    RuntimeState* state, bool* got_partition) {
+Status PartitionedHashJoinNode::PrepareSpilledPartitionForProbe() {
   VLOG(2) << "PrepareSpilledPartitionForProbe\n" << NodeDebugString();
   DCHECK(input_partition_ == NULL);
   DCHECK_EQ(builder_->num_hash_partitions(), 0);
   DCHECK(probe_hash_partitions_.empty());
-  if (spilled_partitions_.empty()) {
-    *got_partition = false;
-    return Status::OK();
-  }
+  DCHECK(!spilled_partitions_.empty());
 
   // TODO: the builder should choose the spilled partition to process.
   input_partition_ = std::move(spilled_partitions_.front());
@@ -397,8 +410,7 @@
           runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
     }
 
-    *got_partition = true;
-    UpdateState(PROBING_SPILLED_PARTITION);
+    UpdateState(HashJoinState::PROBING_SPILLED_PARTITION);
     COUNTER_ADD(num_hash_table_builds_skipped_, 1);
     return Status::OK();
   }
@@ -415,7 +427,7 @@
 
   if (!built) {
     // This build partition still does not fit in memory, repartition.
-    UpdateState(REPARTITIONING_BUILD);
+    UpdateState(HashJoinState::REPARTITIONING_BUILD);
 
     int next_partition_level = build_partition->level() + 1;
     if (UNLIKELY(next_partition_level >= MAX_PARTITION_DEPTH)) {
@@ -446,7 +458,7 @@
     }
 
     RETURN_IF_ERROR(PrepareForProbe());
-    UpdateState(REPARTITIONING_PROBE);
+    UpdateState(HashJoinState::REPARTITIONING_PROBE);
   } else {
     DCHECK(!input_partition_->build_partition()->is_spilled());
     DCHECK(input_partition_->build_partition()->hash_tbl() != NULL);
@@ -455,11 +467,41 @@
     for (int i = 0; i < PARTITION_FANOUT; ++i) {
       hash_tbls_[i] = input_partition_->build_partition()->hash_tbl();
     }
-    UpdateState(PROBING_SPILLED_PARTITION);
+    UpdateState(HashJoinState::PROBING_SPILLED_PARTITION);
   }
 
   COUNTER_ADD(num_probe_rows_partitioned_, input_partition_->probe_rows()->num_rows());
-  *got_partition = true;
+  return Status::OK();
+}
+
+Status PartitionedHashJoinNode::ProcessProbeBatch(RowBatch* out_batch) {
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_IN_BATCH);
+  DCHECK_NE(probe_batch_pos_, -1);
+  // Putting SCOPED_TIMER in the IR version of ProcessProbeBatch() causes weird exception
+  // handling IR in the xcompiled function, so call it here instead.
+  int rows_added = 0;
+  Status status;
+  TPrefetchMode::type prefetch_mode = runtime_state_->query_options().prefetch_mode;
+  SCOPED_TIMER(probe_timer_);
+  if (process_probe_batch_fn_ == NULL) {
+    rows_added = ProcessProbeBatch(
+        join_op_, prefetch_mode, out_batch, ht_ctx_.get(), &status);
+  } else {
+    DCHECK(process_probe_batch_fn_level0_ != NULL);
+    if (ht_ctx_->level() == 0) {
+      rows_added = process_probe_batch_fn_level0_(
+          this, prefetch_mode, out_batch, ht_ctx_.get(), &status);
+    } else {
+      rows_added = process_probe_batch_fn_(
+          this, prefetch_mode, out_batch, ht_ctx_.get(), &status);
+    }
+  }
+  if (UNLIKELY(rows_added < 0)) {
+    DCHECK(!status.ok());
+    return status;
+  }
+  DCHECK(status.ok());
+  out_batch->CommitRows(rows_added);
   return Status::OK();
 }
 
@@ -513,133 +555,113 @@
   // which can happen in a subplan.
   int num_rows_before = out_batch->num_rows();
 
-  while (true) {
-    DCHECK(!*eos);
+  // This loop executes the 'probe_state_' state machine until either a full batch is
+  // produced, resources are attached to 'out_batch' that require flushing, or eos
+  // is reached (i.e. all rows are returned). The next call into GetNext() will resume
+  // execution of the state machine where the current call into GetNext() left off.
+  // See the definition of ProbeState for description of the state machine and states.
+  do {
     DCHECK(status.ok());
-    DCHECK_NE(state_, PARTITIONING_BUILD) << "Should not be in GetNext()";
+    DCHECK(state_ != HashJoinState::PARTITIONING_BUILD)
+        << "Should not be in GetNext() " << static_cast<int>(state_);
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(QueryMaintenance(state));
+    switch (probe_state_) {
+      case ProbeState::PROBING_IN_BATCH: {
+        // Finish processing rows in the current probe batch.
+        RETURN_IF_ERROR(ProcessProbeBatch(out_batch));
+        DCHECK(out_batch->AtCapacity() || probe_batch_pos_ == probe_batch_->num_rows());
+        if (probe_batch_pos_ == probe_batch_->num_rows()
+            && current_probe_row_ == nullptr) {
+          probe_state_ = ProbeState::PROBING_END_BATCH;
+        }
+        break;
+      }
+      case ProbeState::PROBING_END_BATCH: {
+        // Try to get the next row batch from the current probe input.
+        RETURN_IF_ERROR(NextProbeRowBatch(state, out_batch));
 
-    if (!output_build_partitions_.empty()) {
-      DCHECK(NeedToProcessUnmatchedBuildRows(join_op_));
-
-      // Flush the remaining unmatched build rows of any partitions we are done
-      // processing before moving onto the next partition.
-      RETURN_IF_ERROR(OutputUnmatchedBuild(out_batch));
-      if (out_batch->AtCapacity()) break;
-
-      // Stopped before batch was at capacity: - we must have finished outputting
-      // unmatched build rows.
-      DCHECK(output_build_partitions_.empty());
-      DCHECK_EQ(builder_->num_hash_partitions(), 0);
-      DCHECK(probe_hash_partitions_.empty());
-    }
-
-    if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-      // In this case, we want to output rows from the null aware partition.
-      if (builder_->null_aware_partition() == NULL) {
-        DCHECK(null_aware_probe_partition_ == NULL);
+        if (probe_batch_pos_ == 0) {
+          // Got a batch, need to process it.
+          probe_state_ = ProbeState::PROBING_IN_BATCH;
+        } else if (probe_side_eos_ && input_partition_ == nullptr) {
+          DCHECK_EQ(probe_batch_pos_, -1);
+          // Finished processing all the probe rows for the current hash partitions.
+          // There may be some partitions that need to outpt their unmatched build rows.
+          RETURN_IF_ERROR(DoneProbing(state, out_batch));
+          probe_state_ = output_build_partitions_.empty() ?
+              ProbeState::PROBE_COMPLETE :
+              ProbeState::OUTPUTTING_UNMATCHED;
+        } else {
+          // Got an empty batch with resources that we need to flush before getting the
+          // next batch.
+          DCHECK_EQ(probe_batch_pos_, -1);
+        }
+        break;
+      }
+      case ProbeState::OUTPUTTING_UNMATCHED: {
+        DCHECK(!output_build_partitions_.empty());
+        DCHECK_EQ(builder_->num_hash_partitions(), 0);
+        DCHECK(probe_hash_partitions_.empty());
+        DCHECK(NeedToProcessUnmatchedBuildRows(join_op_));
+        // Output the remaining batch of build rows from the current partition.
+        RETURN_IF_ERROR(OutputUnmatchedBuild(out_batch));
+        DCHECK(out_batch->AtCapacity() || output_build_partitions_.empty());
+        if (output_build_partitions_.empty()) probe_state_ = ProbeState::PROBE_COMPLETE;
+        break;
+      }
+      case ProbeState::PROBE_COMPLETE: {
+        if (!spilled_partitions_.empty()) {
+          // Move to the next spilled partition.
+          RETURN_IF_ERROR(PrepareSpilledPartitionForProbe());
+          probe_state_ = ProbeState::PROBING_END_BATCH;
+        } else if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
+            && builder_->null_aware_partition() != nullptr) {
+          // Null aware anti join outputs additional rows after all the probe input,
+          // including spilled partitions, is processed.
+          bool has_null_aware_rows;
+          RETURN_IF_ERROR(PrepareNullAwarePartition(&has_null_aware_rows));
+          probe_state_ = has_null_aware_rows ? ProbeState::OUTPUTTING_NULL_AWARE :
+                                               ProbeState::OUTPUTTING_NULL_PROBE;
+        } else {
+          // No more rows to output from GetNext().
+          probe_state_ = ProbeState::EOS;
+        }
+        break;
+      }
+      case ProbeState::OUTPUTTING_NULL_AWARE: {
+        DCHECK_ENUM_EQ(join_op_, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
+        DCHECK(builder_->null_aware_partition() != nullptr);
+        DCHECK(null_aware_probe_partition_ != nullptr);
+        bool napr_eos;
+        RETURN_IF_ERROR(OutputNullAwareProbeRows(state, out_batch, &napr_eos));
+        if (napr_eos) probe_state_ = ProbeState::OUTPUTTING_NULL_PROBE;
+        break;
+      }
+      case ProbeState::OUTPUTTING_NULL_PROBE: {
+        DCHECK_ENUM_EQ(join_op_, TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN);
+        DCHECK(builder_->null_aware_partition() != nullptr);
+        DCHECK_GE(null_probe_output_idx_, 0);
+        bool nanp_done;
+        RETURN_IF_ERROR(OutputNullAwareNullProbe(state, out_batch, &nanp_done));
+        if (nanp_done) probe_state_ = ProbeState::EOS;
+        break;
+      }
+      case ProbeState::EOS: {
+        // Ensure that all potential sources of output rows are exhausted.
+        DCHECK(probe_side_eos_);
+        DCHECK(output_build_partitions_.empty());
+        DCHECK(spilled_partitions_.empty());
+        DCHECK(builder_->null_aware_partition() == nullptr);
+        DCHECK(null_aware_probe_partition_ == nullptr);
         *eos = true;
         break;
       }
-
-      if (null_probe_output_idx_ >= 0) {
-        RETURN_IF_ERROR(OutputNullAwareNullProbe(state, out_batch));
-        if (out_batch->AtCapacity()) break;
-        continue;
-      }
-
-      if (output_null_aware_probe_rows_running_) {
-        RETURN_IF_ERROR(OutputNullAwareProbeRows(state, out_batch));
-        if (out_batch->AtCapacity()) break;
-        continue;
-      }
+      default:
+        DCHECK(false) << "invalid probe_state_" << static_cast<int>(probe_state_);
+        break;
     }
-
-    // Finish processing rows in the current probe batch.
-    if (probe_batch_pos_ != -1) {
-      // Putting SCOPED_TIMER in ProcessProbeBatch() causes weird exception handling IR
-      // in the xcompiled function, so call it here instead.
-      int rows_added = 0;
-      TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
-      SCOPED_TIMER(probe_timer_);
-      if (process_probe_batch_fn_ == NULL) {
-        rows_added = ProcessProbeBatch(join_op_, prefetch_mode, out_batch, ht_ctx_.get(),
-            &status);
-      } else {
-        DCHECK(process_probe_batch_fn_level0_ != NULL);
-        if (ht_ctx_->level() == 0) {
-          rows_added = process_probe_batch_fn_level0_(this, prefetch_mode, out_batch,
-              ht_ctx_.get(), &status);
-        } else {
-          rows_added = process_probe_batch_fn_(this, prefetch_mode, out_batch,
-              ht_ctx_.get(), &status);
-        }
-      }
-      if (UNLIKELY(rows_added < 0)) {
-        DCHECK(!status.ok());
-        return status;
-      }
-      DCHECK(status.ok());
-      out_batch->CommitRows(rows_added);
-      if (out_batch->AtCapacity()) break;
-
-      DCHECK(current_probe_row_ == NULL);
-    }
-
-    // Try to continue from the current probe side input.
-    if (state_ == PARTITIONING_PROBE) {
-      DCHECK(input_partition_ == NULL);
-      RETURN_IF_ERROR(NextProbeRowBatch(state, out_batch));
-    } else {
-      DCHECK(state_ == REPARTITIONING_PROBE || state_ == PROBING_SPILLED_PARTITION)
-          << state_;
-      DCHECK(probe_side_eos_);
-      if (input_partition_ != NULL) {
-        RETURN_IF_ERROR(NextSpilledProbeRowBatch(state, out_batch));
-      }
-    }
-    // Free expr result allocations of the probe side expressions only after
-    // ExprValuesCache has been reset.
-    DCHECK(ht_ctx_->expr_values_cache()->AtEnd());
-    probe_expr_results_pool_->Clear();
-
-    // We want to return as soon as we have attached a tuple stream to the out_batch
-    // (before preparing a new partition). The attached tuple stream will be recycled
-    // by the caller, freeing up more memory when we prepare the next partition.
-    if (out_batch->AtCapacity()) break;
-
-    // Got a batch, just keep going.
-    if (probe_batch_pos_ == 0) continue;
-    DCHECK_EQ(probe_batch_pos_, -1);
-
-    // Finished up all probe rows for 'hash_partitions_'. We may have already cleaned up
-    // the hash partitions, e.g. if we had to output some unmatched build rows below.
-    if (builder_->num_hash_partitions() != 0) {
-      RETURN_IF_ERROR(DoneProbing(state, out_batch));
-      if (out_batch->AtCapacity()) break;
-    }
-
-    // There are some partitions that need to flush their unmatched build rows.
-    if (!output_build_partitions_.empty()) continue;
-
-    // We get this far if there is nothing left to return from the current partition.
-    // Move to the next spilled partition.
-    bool got_partition;
-    RETURN_IF_ERROR(PrepareSpilledPartitionForProbe(state, &got_partition));
-    if (got_partition) continue; // Probe the spilled partition.
-
-    if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
-      // Prepare the null-aware partitions, then resume at the top of the loop to output
-      // the rows.
-      RETURN_IF_ERROR(PrepareNullAwarePartition());
-      continue;
-    }
-    DCHECK(builder_->null_aware_partition() == NULL);
-
-    *eos = true;
-    break;
-  }
+  } while (!out_batch->AtCapacity() && !*eos);
 
   int num_rows_added = out_batch->num_rows() - num_rows_before;
   DCHECK_GE(num_rows_added, 0);
@@ -662,6 +684,7 @@
   SCOPED_TIMER(probe_timer_);
   DCHECK(NeedToProcessUnmatchedBuildRows(join_op_));
   DCHECK(!output_build_partitions_.empty());
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::OUTPUTTING_UNMATCHED);
 
   if (output_unmatched_batch_iter_.get() != NULL) {
     // There were no probe rows so we skipped building the hash table. In this case, all
@@ -676,6 +699,7 @@
 }
 
 Status PartitionedHashJoinNode::OutputAllBuild(RowBatch* out_batch) {
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::OUTPUTTING_UNMATCHED);
   // This will only be called for partitions that are added to 'output_build_partitions_'
   // in NextSpilledProbeRowBatch(), which adds one partition that is then processed until
   // it is done by the loop in GetNext(). So, there must be exactly one partition in
@@ -727,6 +751,7 @@
 }
 
 void PartitionedHashJoinNode::OutputUnmatchedBuildFromHashTable(RowBatch* out_batch) {
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::OUTPUTTING_UNMATCHED);
   ScalarExprEvaluator* const* conjunct_evals = conjunct_evals_.data();
   const int num_conjuncts = conjuncts_.size();
   RowBatch::Iterator out_batch_iterator(out_batch, out_batch->num_rows());
@@ -786,12 +811,13 @@
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::OutputNullAwareNullProbe(RuntimeState* state,
-    RowBatch* out_batch) {
+Status PartitionedHashJoinNode::OutputNullAwareNullProbe(
+    RuntimeState* state, RowBatch* out_batch, bool* done) {
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::OUTPUTTING_NULL_PROBE);
   DCHECK(builder_->null_aware_partition() != NULL);
   DCHECK(null_aware_probe_partition_ != NULL);
-  DCHECK(!output_null_aware_probe_rows_running_);
   DCHECK_NE(probe_batch_pos_, -1);
+  *done = false;
 
   if (probe_batch_pos_ == probe_batch_->num_rows()) {
     probe_batch_pos_ = 0;
@@ -807,6 +833,7 @@
       // Flush out the resources to free up memory.
       null_probe_rows_->Close(out_batch, RowBatch::FlushMode::FLUSH_RESOURCES);
       null_probe_rows_.reset();
+      *done = true;
       return Status::OK();
     }
   }
@@ -876,10 +903,10 @@
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::PrepareNullAwarePartition() {
+Status PartitionedHashJoinNode::PrepareNullAwarePartition(bool* has_null_aware_rows) {
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBE_COMPLETE);
   DCHECK(builder_->null_aware_partition() != NULL);
   DCHECK(null_aware_probe_partition_ != NULL);
-  DCHECK(!output_null_aware_probe_rows_running_);
   DCHECK_EQ(probe_batch_pos_, -1);
   DCHECK_EQ(probe_batch_->num_rows(), 0);
 
@@ -890,12 +917,11 @@
     // There were no build rows. Nothing to do. Just prepare to output the null
     // probe rows.
     DCHECK_EQ(probe_stream->num_rows(), 0);
-    output_null_aware_probe_rows_running_ = false;
     RETURN_IF_ERROR(PrepareNullAwareNullProbe());
+    *has_null_aware_rows = false;
     return Status::OK();
   }
 
-  output_null_aware_probe_rows_running_ = true;
   bool pinned;
   RETURN_IF_ERROR(build_stream->PinStream(&pinned));
   if (!pinned) return NullAwareAntiJoinError(build_stream);
@@ -908,15 +934,16 @@
         runtime_state_, Substitute(PREPARE_FOR_READ_FAILED_ERROR_MSG, id_));
   }
   probe_batch_pos_ = 0;
+  *has_null_aware_rows = true;
   return Status::OK();
 }
 
-Status PartitionedHashJoinNode::OutputNullAwareProbeRows(RuntimeState* state,
-    RowBatch* out_batch) {
+Status PartitionedHashJoinNode::OutputNullAwareProbeRows(
+    RuntimeState* state, RowBatch* out_batch, bool* done) {
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::OUTPUTTING_NULL_AWARE);
   DCHECK(builder_->null_aware_partition() != NULL);
   DCHECK(null_aware_probe_partition_ != NULL);
-  DCHECK(output_null_aware_probe_rows_running_);
-
+  *done = false;
   ScalarExprEvaluator* const* join_conjunct_evals = other_join_conjunct_evals_.data();
   int num_join_conjuncts = other_join_conjuncts_.size();
   DCHECK(probe_batch_ != NULL);
@@ -931,11 +958,11 @@
     bool eos;
     RETURN_IF_ERROR(probe_stream->GetNext(probe_batch_.get(), &eos));
 
-    if (probe_batch_->num_rows() == 0) {
-      RETURN_IF_ERROR(EvaluateNullProbe(
-            state, builder_->null_aware_partition()->build_rows()));
-      output_null_aware_probe_rows_running_ = false;
+    if (probe_batch_->num_rows() == 0 && eos) {
+      RETURN_IF_ERROR(
+          EvaluateNullProbe(state, builder_->null_aware_partition()->build_rows()));
       RETURN_IF_ERROR(PrepareNullAwareNullProbe());
+      *done = true;
       return Status::OK();
     }
   }
@@ -1101,10 +1128,17 @@
 }
 
 Status PartitionedHashJoinNode::DoneProbing(RuntimeState* state, RowBatch* batch) {
+  DCHECK_ENUM_EQ(probe_state_, ProbeState::PROBING_END_BATCH);
   DCHECK_EQ(probe_batch_pos_, -1);
   // At this point all the rows have been read from the probe side for all partitions in
   // hash_partitions_.
   VLOG(2) << "Probe Side Consumed\n" << NodeDebugString();
+  if (builder_->num_hash_partitions() == 0) {
+    // No hash partitions, so no cleanup required. This can only happen when we are
+    // processing a single spilled partition.
+    DCHECK_ENUM_EQ(state_, HashJoinState::PROBING_SPILLED_PARTITION);
+    return Status::OK();
+  }
 
   // Walk the partitions that had hash tables built for the probe phase and close them.
   // In the case of right outer and full outer joins, instead of closing those partitions,
@@ -1170,15 +1204,20 @@
 void PartitionedHashJoinNode::UpdateState(HashJoinState next_state) {
   // Validate the state transition.
   switch (state_) {
-    case PARTITIONING_BUILD: DCHECK_EQ(next_state, PARTITIONING_PROBE); break;
-    case PARTITIONING_PROBE:
-    case REPARTITIONING_PROBE:
-    case PROBING_SPILLED_PARTITION:
-      DCHECK(
-          next_state == REPARTITIONING_BUILD || next_state == PROBING_SPILLED_PARTITION);
+    case HashJoinState::PARTITIONING_BUILD:
+      DCHECK_ENUM_EQ(next_state, HashJoinState::PARTITIONING_PROBE);
       break;
-    case REPARTITIONING_BUILD: DCHECK_EQ(next_state, REPARTITIONING_PROBE); break;
-    default: DCHECK(false) << "Invalid state " << state_;
+    case HashJoinState::PARTITIONING_PROBE:
+    case HashJoinState::REPARTITIONING_PROBE:
+    case HashJoinState::PROBING_SPILLED_PARTITION:
+      DCHECK(next_state == HashJoinState::REPARTITIONING_BUILD
+          || next_state == HashJoinState::PROBING_SPILLED_PARTITION);
+      break;
+    case HashJoinState::REPARTITIONING_BUILD:
+      DCHECK_ENUM_EQ(next_state, HashJoinState::REPARTITIONING_PROBE);
+      break;
+    default:
+      DCHECK(false) << "Invalid state " << static_cast<int>(state_);
   }
   state_ = next_state;
   VLOG(2) << "Transitioned State:" << endl << NodeDebugString();
@@ -1186,11 +1225,16 @@
 
 string PartitionedHashJoinNode::PrintState() const {
   switch (state_) {
-    case PARTITIONING_BUILD: return "PartitioningBuild";
-    case PARTITIONING_PROBE: return "PartitioningProbe";
-    case PROBING_SPILLED_PARTITION: return "ProbingSpilledPartition";
-    case REPARTITIONING_BUILD: return "RepartitioningBuild";
-    case REPARTITIONING_PROBE: return "RepartitioningProbe";
+    case HashJoinState::PARTITIONING_BUILD:
+      return "PartitioningBuild";
+    case HashJoinState::PARTITIONING_PROBE:
+      return "PartitioningProbe";
+    case HashJoinState::PROBING_SPILLED_PARTITION:
+      return "ProbingSpilledPartition";
+    case HashJoinState::REPARTITIONING_BUILD:
+      return "RepartitioningBuild";
+    case HashJoinState::REPARTITIONING_PROBE:
+      return "RepartitioningProbe";
     default: DCHECK(false);
   }
   return "";
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index b771e32..126beb1 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -83,6 +83,9 @@
 ///      either PROBING_SPILLED_PARTITION or REPARTITIONING_BUILD, depending on whether
 ///      the spilled partition's hash table fits in memory or not.
 ///
+///      This phase has sub-states (see ProbeState) that are used in GetNext() to drive
+///      progress.
+///
 ///   3. [PROBING_SPILLED_PARTITION] Read the probe rows from a spilled partition that
 ///      was brought back into memory and probe the partition's hash table. Finally,
 ///      output unmatched build rows for join modes that require it.
@@ -92,6 +95,9 @@
 ///      either PROBING_SPILLED_PARTITION or REPARTITIONING_BUILD, depending on whether
 ///      the spilled partition's hash table fits in memory or not.
 ///
+///      This phase has sub-states (see ProbeState) that are used in GetNext() to drive
+///      progress.
+///
 /// Null aware anti-join (NAAJ) extends the above algorithm by accumulating rows with
 /// NULLs into several different streams, which are processed in a separate step to
 /// produce additional output rows. The NAAJ algorithm is documented in more detail in
@@ -121,7 +127,7 @@
  private:
   class ProbePartition;
 
-  enum HashJoinState {
+  enum class HashJoinState {
     /// Partitioning the build (right) child's input into the builder's hash partitions.
     PARTITIONING_BUILD,
 
@@ -144,6 +150,73 @@
     REPARTITIONING_PROBE,
   };
 
+  // This enum represents a sub-state of the PARTITIONING_PROBE,
+  // PROBING_SPILLED_PARTITION and REPARTITIONING_PROBE states.
+  // This drives the state machine in GetNext() that processes probe batches and generates
+  // output rows. This state machine executes within a HashJoinState state, starting with
+  // PROBING_IN_BATCH and ending with PROBE_COMPLETE.
+  //
+  // The state transition diagram is below. The state machine handles iterating through
+  // probe batches (PROBING_IN_BATCH <-> PROBING_END_BATCH), with each input probe batch
+  // producing a variable number of output rows. Once the last probe batch is processed,
+  // additional rows may need to be emitted from the build side per-partition, e.g.
+  // for right outer join (OUTPUTTING_UNMATCHED). Then PROBE_COMPLETE is entered,
+  // indicating that probing is complete. Then the top-level state machine (described by
+  // HashJoinState) takes over and either the next spilled partition is processed, final
+  // null-aware anti join processing is done, or eos can be returned from GetNext().
+  //
+  // start                     if hash tables
+  //     +------------------+  store matches  +----------------------+
+  //---->+ PROBING_IN_BATCH |  +------------->+ OUTPUTTING_UNMATCHED |
+  //     +-----+-----+------+  |              +------+---------------+
+  //           ^     |         |                     |
+  //           |     |         |                     |
+  //           |     v         |                     v
+  //     +-----+-----+-------+ | otherwise  +--------+-------+
+  // +-->+ PROBING_END_BATCH +-+----------->+ PROBE_COMPLETE |
+  // |   +-------------------+              +--+------+------+
+  // |                                       | |    | if NAAJ
+  // +---------------------------------------+ |    | and no spilled
+  //      if spilled partitions left           |    | partitions left
+  //                                           |    |
+  //                           if not NAAJ     |    |
+  //                           and no spilled  |    +---------------+
+  //                +-------+  partitions left |    | if null-aware | otherwise
+  //                |  EOS  +<-----------------+    | partition     |
+  //                +---+---+                       v has rows      |
+  //                    ^              +------------+----------+    |
+  //                    |              | OUTPUTTING_NULL_AWARE |    |
+  //                    |              +------------+----------+    |
+  //                    |                           |               |
+  //                    |                           v               |
+  //                    |              +------------+----------+    |
+  //                    +--------------+ OUTPUTTING_NULL_PROBE +<---+
+  //                                   +-----------------------+
+  enum class ProbeState {
+    // Processing probe batches and more rows in the current probe batch must be
+    // processed.
+    PROBING_IN_BATCH,
+    // Processing probe batches and no more rows in the current probe batch to process.
+    PROBING_END_BATCH,
+    // All probe batches have been processed, unmatched build rows need to be outputted
+    // from 'output_build_partitions_'.
+    // This state is only used if NeedToProcessUnmatchedBuildRows(join_op_) is true.
+    OUTPUTTING_UNMATCHED,
+    // All input probe rows from the child ExecNode or the current spilled partition have
+    // been processed, and all unmatched rows from the build have been output.
+    PROBE_COMPLETE,
+    // All input has been processed. We need to process builder->null_aware_partition()
+    // and output any rows from it.
+    // This state is only used if join_op_ is NULL_AWARE_ANTI_JOIN.
+    OUTPUTTING_NULL_AWARE,
+    // All input has been processed. We need to process null_probe_rows_ and output any
+    // rows from it.
+    // This state is only used if join_op_ is NULL_AWARE_ANTI_JOIN.
+    OUTPUTTING_NULL_PROBE,
+    // All output rows have been produced - GetNext() should return eos.
+    EOS,
+  };
+
   /// Constants from PhjBuilder, added to this node for convenience.
   static const int PARTITION_FANOUT = PhjBuilder::PARTITION_FANOUT;
   static const int NUM_PARTITIONING_BITS = PhjBuilder::NUM_PARTITIONING_BITS;
@@ -288,6 +361,10 @@
   int ProcessProbeBatch(TPrefetchMode::type, RowBatch* out_batch, HashTableCtx* ht_ctx,
       Status* status);
 
+  /// Wrapper that calls either the interpreted or codegen'd version of
+  /// ProcessProbeBatch() and commits the rows to 'out_batch' on success.
+  Status ProcessProbeBatch(RowBatch* out_batch);
+
   /// Wrapper that calls the templated version of ProcessProbeBatch() based on 'join_op'.
   int ProcessProbeBatch(const TJoinOp::type join_op, TPrefetchMode::type,
       RowBatch* out_batch, HashTableCtx* ht_ctx, Status* status);
@@ -323,12 +400,17 @@
   Status InitNullProbeRows() WARN_UNUSED_RESULT;
 
   /// Initializes null_aware_partition_ and nulls_build_batch_ to output rows.
-  Status PrepareNullAwarePartition() WARN_UNUSED_RESULT;
+  /// *has_null_aware_rows is set to true if the null-aware partition has rows that need
+  /// to be processed by calling OutputNullAwareProbeRows(), false otherwise. In both
+  /// cases, null probe rows need to be processed with OutputNullAwareNullProbe().
+  Status PrepareNullAwarePartition(bool* has_null_aware_rows) WARN_UNUSED_RESULT;
 
-  /// Continues processing from null_aware_partition_. Called after we have finished
-  /// processing all build and probe input (including repartitioning them).
+  /// Output rows from builder_->null_aware_partition(). Called when 'probe_state_'
+  /// is OUTPUTTING_NULL_AWARE - after all input is processed, including spilled
+  /// partitions. Sets *done = true if there are no more rows to output from this
+  /// function, false otherwise.
   Status OutputNullAwareProbeRows(
-      RuntimeState* state, RowBatch* out_batch) WARN_UNUSED_RESULT;
+      RuntimeState* state, RowBatch* out_batch, bool* done) WARN_UNUSED_RESULT;
 
   /// Evaluates all other_join_conjuncts against null_probe_rows_ with all the
   /// rows in build. This updates matched_null_probe_, short-circuiting if one of the
@@ -337,17 +419,21 @@
   Status EvaluateNullProbe(
       RuntimeState* state, BufferedTupleStream* build) WARN_UNUSED_RESULT;
 
-  /// Prepares to output NULLs on the probe side for NAAJ. Before calling this,
-  /// matched_null_probe_ should have been fully evaluated.
+  /// Prepares to output NULLs on the probe side for NAAJ, when transitioning
+  /// 'probe_state_' to OUTPUTTING_NULL_PROBE. Before calling this, 'matched_null_probe_'
+  /// must be fully evaluated.
   Status PrepareNullAwareNullProbe() WARN_UNUSED_RESULT;
 
   /// Outputs NULLs on the probe side, returning rows where matched_null_probe_[i] is
-  /// false. Used for NAAJ.
+  /// false. Called repeatedly after PrepareNullAwareNullProbe(), when 'probe_state_'
+  /// is OUTPUTTING_NULL_PROBE for NAAJ. Sets *done = true if there are no more rows to
+  /// output from this function, false otherwise.
   Status OutputNullAwareNullProbe(
-      RuntimeState* state, RowBatch* out_batch) WARN_UNUSED_RESULT;
+      RuntimeState* state, RowBatch* out_batch, bool* done) WARN_UNUSED_RESULT;
 
-  /// Call at the end of consuming the probe rows. Cleans up the build and probe hash
-  /// partitions and:
+  /// Call at the end of consuming the probe rows, when 'probe_state_' is
+  /// PROBING_END_BATCH, before transitioning to PROBE_COMPLETE or OUTPUTTING_UNMATCHED.
+  /// Cleans up the build and probe hash partitions, if needed, and:
   ///  - If the build partition had a hash table, close it. The build and probe
   ///    partitions are fully processed. The streams are transferred to 'batch'.
   ///    In the case of right-outer and full-outer joins, instead of closing this
@@ -357,26 +443,37 @@
   ///    rows were spilled, move the partition to 'spilled_partitions_'.
   Status DoneProbing(RuntimeState* state, RowBatch* batch) WARN_UNUSED_RESULT;
 
+  /// Get the next row batch from the probe (left) side (child(0)), if we are still
+  /// doing the first pass over the input (i.e. state_ is PARTITIONING_PROBE) or
+  /// from the spilled 'input_partition_' if state_ is REPARTITIONING_PROBE.
+  //. If we are done consuming the input, sets 'probe_batch_pos_' to -1, otherwise,
+  /// sets it to 0.  'probe_state_' must be PROBING_END_BATCH.
+  Status NextProbeRowBatch(
+      RuntimeState* state, RowBatch* out_batch) WARN_UNUSED_RESULT;
+
   /// Get the next row batch from the probe (left) side (child(0)). If we are done
   /// consuming the input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0.
-  Status NextProbeRowBatch(RuntimeState* state, RowBatch* out_batch) WARN_UNUSED_RESULT;
+  /// 'probe_state_' must be PROBING_END_BATCH.
+  Status NextProbeRowBatchFromChild(
+      RuntimeState* state, RowBatch* out_batch) WARN_UNUSED_RESULT;
 
   /// Get the next probe row batch from 'input_partition_'. If we are done consuming the
   /// input, sets 'probe_batch_pos_' to -1, otherwise, sets it to 0.
+  /// 'probe_state_' must be PROBING_END_BATCH.
   Status NextSpilledProbeRowBatch(
       RuntimeState* state, RowBatch* out_batch) WARN_UNUSED_RESULT;
 
-  /// Moves onto the next spilled partition and initializes 'input_partition_'. This
-  /// function processes the entire build side of 'input_partition_' and when this
-  /// function returns, we are ready to consume the probe side of 'input_partition_'.
+  /// Called when 'probe_state_' is PROBE_COMPLETE to start processing the next spilled
+  /// partition. This function sets 'input_partition_' to the chosen partition, then
+  /// processes the entire build side of 'input_partition_'. When this function returns
+  /// function returns, we are ready to consume probe rows in 'input_partition_'.
   /// If the build side's hash table fits in memory and there are probe rows, we will
   /// construct input_partition_'s hash table. If it does not fit, meaning we need to
   /// repartition, this function will repartition the build rows into
   /// 'builder->hash_partitions_' and prepare for repartitioning the partition's probe
   /// rows. If there are no probe rows, we just prepare the build side to be read by
   /// OutputUnmatchedBuild().
-  Status PrepareSpilledPartitionForProbe(
-      RuntimeState* state, bool* got_partition) WARN_UNUSED_RESULT;
+  Status PrepareSpilledPartitionForProbe() WARN_UNUSED_RESULT;
 
   /// Construct an error status for the null-aware anti-join when it could not fit 'rows'
   /// from the build side in memory.
@@ -387,7 +484,8 @@
   /// 'row_batch' is not NULL, transfers ownership of all row-backing resources to it.
   void CloseAndDeletePartitions(RowBatch* row_batch);
 
-  /// Prepares for probing the next batch.
+  /// Prepares for probing the next batch. Called after populating 'probe_batch_'
+  /// with rows and entering 'probe_state_' PROBING_IN_BATCH.
   void ResetForProbe();
 
   /// Codegen function to create output row. Assumes that the probe row is non-NULL.
@@ -432,20 +530,24 @@
   HashTable::Iterator hash_tbl_iterator_;
 
   /// Number of probe rows that have been partitioned.
-  RuntimeProfile::Counter* num_probe_rows_partitioned_;
+  RuntimeProfile::Counter* num_probe_rows_partitioned_ = nullptr;
 
   /// Time spent evaluating other_join_conjuncts for NAAJ.
-  RuntimeProfile::Counter* null_aware_eval_timer_;
+  RuntimeProfile::Counter* null_aware_eval_timer_ = nullptr;
 
   /// Number of partitions which had zero probe rows and we therefore didn't build the
   /// hash table.
-  RuntimeProfile::Counter* num_hash_table_builds_skipped_;
+  RuntimeProfile::Counter* num_hash_table_builds_skipped_ = nullptr;
 
   /////////////////////////////////////////
   /// BEGIN: Members that must be Reset()
 
-  /// State of the partitioned hash join algorithm. Used just for debugging.
-  HashJoinState state_;
+  /// State of the partitioned hash join algorithm. See HashJoinState for more
+  /// information.
+  HashJoinState state_ = HashJoinState::PARTITIONING_BUILD;
+
+  /// State of the probing algorithm. Used to drive the state machine in GetNext().
+  ProbeState probe_state_ = ProbeState::PROBE_COMPLETE;
 
   /// The build-side of the join. Initialized in Init().
   boost::scoped_ptr<PhjBuilder> builder_;
@@ -475,13 +577,10 @@
   std::unique_ptr<ProbePartition> input_partition_;
 
   /// In the case of right-outer and full-outer joins, this is the list of the partitions
-  /// for which we need to output their unmatched build rows.
-  /// This list is populated at DoneProbing().
+  /// for which we need to output their unmatched build rows. This list is populated at
+  /// DoneProbing(). If this is non-empty, probe_state_ must be OUTPUTTING_UNMATCHED.
   std::list<PhjBuilder::Partition*> output_build_partitions_;
 
-  /// Whether this join is in a state outputting rows from OutputNullAwareProbeRows().
-  bool output_null_aware_probe_rows_running_;
-
   /// Partition used if 'null_aware_' is set. During probing, rows from the probe
   /// side that did not have a match in the hash table are appended to this partition.
   /// At the very end, we then iterate over the partition's probe rows. For each probe
@@ -495,24 +594,31 @@
   /// conjuncts. Must be unique_ptr so we can release it and transfer to output batches.
   /// The stream starts off in memory but is unpinned if there is memory pressure,
   /// specifically if any partitions spilled or appending to the pinned stream failed.
+  /// Populated during the first pass over the probe input (i.e. while state_ is
+  /// PARTITIONING_PROBE) and then output at the end after all input data is processed.
   std::unique_ptr<BufferedTupleStream> null_probe_rows_;
 
   /// For each row in null_probe_rows_, true if this row has matched any build row
-  /// (i.e. the resulting joined row passes other_join_conjuncts).
-  /// TODO: remove this. We need to be able to put these bits inside the tuple itself.
+  /// (i.e. the resulting joined row passes other_join_conjuncts). Populated
+  /// during the first pass over the probe input (i.e. while state_ is PARTITIONING_PROBE)
+  /// and then evaluated for each build partition.
+  /// TODO: ideally we would store the bits inside the tuple data of 'null_probe_rows_'
+  /// instead of in this untracked auxiliary memory.
   std::vector<bool> matched_null_probe_;
 
   /// The current index into null_probe_rows_/matched_null_probe_ that we are
-  /// outputting.
-  int64_t null_probe_output_idx_;
+  /// outputting. -1 means invalid. Only has a valid index when probe_state_ is
+  /// OUTPUTTING_NULL_PROBE.
+  int64_t null_probe_output_idx_ = -1;
 
   /// Used by OutputAllBuild() to iterate over the entire build side tuple stream of the
-  /// current partition.
+  /// current partition. Only used when probe_state_ is OUTPUTTING_UNMATCHED.
   std::unique_ptr<RowBatch> output_unmatched_batch_;
 
   /// Stores an iterator into 'output_unmatched_batch_' to start from on the next call to
   /// OutputAllBuild(), or NULL if there are no partitions without hash tables needing to
-  /// be processed by OutputUnmatchedBuild().
+  /// be processed by OutputUnmatchedBuild(). Only used when probe_state_ is
+  /// OUTPUTTING_UNMATCHED.
   std::unique_ptr<RowBatch::Iterator> output_unmatched_batch_iter_;
 
   /// END: Members that must be Reset()
@@ -562,9 +668,8 @@
   typedef int (*ProcessProbeBatchFn)(PartitionedHashJoinNode*,
       TPrefetchMode::type, RowBatch*, HashTableCtx*, Status*);
   /// Jitted ProcessProbeBatch function pointers.  NULL if codegen is disabled.
-  ProcessProbeBatchFn process_probe_batch_fn_;
-  ProcessProbeBatchFn process_probe_batch_fn_level0_;
-
+  ProcessProbeBatchFn process_probe_batch_fn_ = nullptr;
+  ProcessProbeBatchFn process_probe_batch_fn_level0_ = nullptr;
 };
 
 }