IMPALA-9197: make HashTable lookups thread-safe

This makes it possible to do HashTable lookups from
multiple threads without any data races. This
requires moving statistics that are updated during
probing to the HashTableCtx object.

There are some small changes to the hash table stat
logging behaviour as a result of the stats being moved
to the context. I don't believe these logs are used much,
if at all.

Testing:
Ran exhaustive tests.

Manually inspected some aggregation and join query profiles
to check that hash table stats looked reasonable.

Perf:
Ran TPC-H scale factor 30 on a single node. No significant
change in perf.

Change-Id: I92fbfa8cc000477b8e01975a102d818f9fa27c61
Reviewed-on: http://gerrit.cloudera.org:8080/14917
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index e0d321e..6374f80 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -399,7 +399,10 @@
   ClosePartitions();
 
   if (tuple_pool_.get() != nullptr) tuple_pool_->FreeAll();
-  if (ht_ctx_.get() != nullptr) ht_ctx_->Close(state);
+  if (ht_ctx_.get() != nullptr) {
+    ht_ctx_->StatsCountersAdd(ht_stats_profile_.get());
+    ht_ctx_->Close(state);
+  }
   ht_ctx_.reset();
   if (serialize_stream_.get() != nullptr) {
     serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
@@ -669,7 +672,7 @@
 }
 
 Status GroupingAggregator::CheckAndResizeHashPartitions(
-    bool partitioning_aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) {
+    bool partitioning_aggregated_rows, int num_rows, HashTableCtx* ht_ctx) {
   DCHECK(!is_streaming_preagg_);
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
     Partition* partition = hash_partitions_[i];
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index 5baac2e..3671327 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -583,7 +583,7 @@
   /// resize the hash tables, may spill partitions. 'aggregated_rows' is true if
   /// we're currently partitioning aggregated rows.
   Status CheckAndResizeHashPartitions(
-      bool aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
+      bool aggregated_rows, int num_rows, HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
 
   /// Prepares the next partition to return results from. On return, this function
   /// initializes output_iterator_ and output_partition_. This either removes
diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc
index 5bd2b91..bec3c4d 100644
--- a/be/src/exec/hash-table.cc
+++ b/be/src/exec/hash-table.cc
@@ -152,6 +152,7 @@
 }
 
 void HashTableCtx::Close(RuntimeState* state) {
+  if (VLOG_IS_ON(3)) VLOG(3) << PrintStats();
   free(scratch_row_);
   scratch_row_ = NULL;
   expr_values_cache_.Close(expr_perm_pool_->mem_tracker());
@@ -159,6 +160,21 @@
   ScalarExprEvaluator::Close(probe_expr_evals_, state);
 }
 
+string HashTableCtx::PrintStats() const {
+  double avg_travel = num_probes_ == 0 ? 0 : (double)travel_length_ / (double)num_probes_;
+  stringstream ss;
+  ss << "Probes: " << num_probes_ << endl;
+  ss << "Travel: " << travel_length_ << " " << avg_travel << endl;
+  ss << "HashCollisions: " << num_hash_collisions_ << endl;
+  return ss.str();
+}
+
+void HashTableCtx::StatsCountersAdd(HashTableStatsProfile* profile) {
+  COUNTER_ADD(profile->num_hash_collisions_, num_hash_collisions_);
+  COUNTER_ADD(profile->num_hash_probes_, num_probes_);
+  COUNTER_ADD(profile->num_hash_travels_, travel_length_);
+}
+
 uint32_t HashTableCtx::Hash(const void* input, int len, uint32_t hash) const {
   /// Use CRC hash at first level for better performance. Switch to murmur hash at
   /// subsequent levels since CRC doesn't randomize well with different seed inputs.
@@ -433,26 +449,21 @@
 }
 
 void HashTable::Close() {
-  // Print statistics only for the large or heavily used hash tables.
-  // TODO: Tweak these numbers/conditions, or print them always?
+  // Print statistics only for the large hash tables or extremely verbose logging is
+  // enabled.
   const int64_t LARGE_HT = 128 * 1024;
-  const int64_t HEAVILY_USED = 1024 * 1024;
-  // TODO: These statistics should go to the runtime profile as well.
-  if ((num_buckets_ > LARGE_HT) || (num_probes_ > HEAVILY_USED)) VLOG(2) << PrintStats();
+  if (num_buckets_ > LARGE_HT || VLOG_IS_ON(3)) VLOG(2) << PrintStats();
   for (auto& data_page : data_pages_) allocator_->Free(move(data_page));
   data_pages_.clear();
   if (bucket_allocation_ != nullptr) allocator_->Free(move(bucket_allocation_));
 }
 
 void HashTable::StatsCountersAdd(HashTableStatsProfile* profile) {
-  COUNTER_ADD(profile->num_hash_collisions_, num_hash_collisions_);
-  COUNTER_ADD(profile->num_hash_probes_, num_probes_);
-  COUNTER_ADD(profile->num_hash_travels_, travel_length_);
-  COUNTER_ADD(profile->num_hash_resizes_, this->num_resizes_);
+  COUNTER_ADD(profile->num_hash_resizes_, num_resizes_);
 }
 
 Status HashTable::CheckAndResize(
-    uint64_t buckets_to_fill, const HashTableCtx* ht_ctx, bool* got_memory) {
+    uint64_t buckets_to_fill, HashTableCtx* __restrict__ ht_ctx, bool* got_memory) {
   uint64_t shift = 0;
   while (num_filled_buckets_ + buckets_to_fill >
          (num_buckets_ << shift) * MAX_FILL_FACTOR) {
@@ -464,7 +475,7 @@
 }
 
 Status HashTable::ResizeBuckets(
-    int64_t num_buckets, const HashTableCtx* ht_ctx, bool* got_memory) {
+    int64_t num_buckets, HashTableCtx* __restrict__ ht_ctx, bool* got_memory) {
   DCHECK_EQ((num_buckets & (num_buckets - 1)), 0)
       << "num_buckets=" << num_buckets << " must be a power of 2";
   DCHECK_GT(num_buckets, num_filled_buckets_)
@@ -502,8 +513,8 @@
        NextFilledBucket(&iter.bucket_idx_, &iter.node_)) {
     Bucket* bucket_to_copy = &buckets_[iter.bucket_idx_];
     bool found = false;
-    int64_t bucket_idx =
-        Probe<true>(new_buckets, num_buckets, NULL, bucket_to_copy->hash, &found);
+    int64_t bucket_idx = Probe<true, false>(
+        new_buckets, num_buckets, ht_ctx, bucket_to_copy->hash, &found);
     DCHECK(!found);
     DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND) << " Probe failed even though "
         " there are free buckets. " << num_buckets << " " << num_filled_buckets_;
@@ -581,17 +592,13 @@
 }
 
 string HashTable::PrintStats() const {
-  double curr_fill_factor = (double)num_filled_buckets_/(double)num_buckets_;
-  double avg_travel = (double)travel_length_/(double)num_probes_;
-  double avg_collisions = (double)num_hash_collisions_/(double)num_filled_buckets_;
+  double curr_fill_factor =
+      num_buckets_ == 0 ? 0 : (double)num_filled_buckets_ / (double)num_buckets_;
   stringstream ss;
   ss << "Buckets: " << num_buckets_ << " " << num_filled_buckets_ << " "
      << curr_fill_factor << endl;
   ss << "Duplicates: " << num_buckets_with_duplicates_ << " buckets "
      << num_duplicate_nodes_ << " nodes" << endl;
-  ss << "Probes: " << num_probes_ << endl;
-  ss << "Travel: " << travel_length_ << " " << avg_travel << endl;
-  ss << "HashCollisions: " << num_hash_collisions_ << " " << avg_collisions << endl;
   ss << "Resizes: " << num_resizes_ << endl;
   return ss.str();
 }
diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h
index c4df52c..03dd850 100644
--- a/be/src/exec/hash-table.h
+++ b/be/src/exec/hash-table.h
@@ -51,6 +51,7 @@
 class Tuple;
 class TupleRow;
 class HashTable;
+struct HashTableStatsProfile;
 
 /// Linear or quadratic probing hash table implementation tailored to the usage pattern
 /// for partitioned hash aggregation and hash joins. The hash table stores TupleRows and
@@ -63,11 +64,16 @@
 /// expr results for the current row being processed when possible into a contiguous
 /// memory buffer. This allows for efficient hash computation.
 //
-/// The hash table does not support removes. The hash table is not thread safe.
-/// The table is optimized for the partition hash aggregation and hash joins and is not
-/// intended to be a generic hash table implementation. The API loosely mimics the
-/// std::hashset API.
-//
+/// The hash table does not support removes. The table is optimized for the partition hash
+/// aggregation and hash joins and is not intended to be a generic hash table
+/// implementation.
+///
+/// Operations that mutate the hash table are not thread-safe. Read-only access to the
+/// hash table, is, however, thread safe: multiple threads, each with their own
+/// HashTableCtx, can safely look up rows in the hash table with FindProbeRow(), etc so
+/// long as no thread is mutating the hash table. Thread-safe methods are explicitly
+/// documented.
+///
 /// The data (rows) are stored in a BufferedTupleStream. The basic data structure of this
 /// hash table is a vector of buckets. The buckets (indexed by the mod of the hash)
 /// contain a pointer to either the slot in the tuple-stream or in case of duplicate
@@ -129,6 +135,15 @@
   /// Call to cleanup any resources allocated by the expression evaluators.
   void Close(RuntimeState* state);
 
+  /// Update and print some statistics that can be used for performance debugging.
+  std::string PrintStats() const;
+
+  /// Add operations stats of this hash table context to the counters in profile.
+  /// This method should only be called once for each context and be called during
+  /// closing the owner object of the context. Not all the counters are added with the
+  /// method, only counters for Probes, travels and collisions are affected.
+  void StatsCountersAdd(HashTableStatsProfile* profile);
+
   void set_level(int level);
 
   int ALWAYS_INLINE level() const { return level_; }
@@ -527,6 +542,19 @@
   /// clearing them when results from the respective expr evaluators are no longer needed.
   MemPool* build_expr_results_pool_;
   MemPool* probe_expr_results_pool_;
+
+  /// The stats below can be used for debugging perf.
+  /// Number of Probe() calls that probe the hash table.
+  int64_t num_probes_ = 0;
+
+  /// Total distance traveled for each probe. That is the sum of the diff between the end
+  /// position of a probe (find/insert) and its start position
+  /// (hash & (num_buckets_ - 1)).
+  int64_t travel_length_ = 0;
+
+  /// The number of cases where we had to compare buckets with the same hash value, but
+  /// the row equality failed.
+  int64_t num_hash_collisions_ = 0;
 };
 
 /// HashTableStatsProfile encapsulates hash tables stats. It tracks the stats of all the
@@ -655,7 +683,7 @@
   /// Add operations stats of this hash table to the counters in profile.
   /// This method should only be called once for each HashTable and be called during
   /// closing the owner object of the HashTable. Not all the counters are added with the
-  /// method, only counters for Probes, travels, collisions and resizes are affected.
+  /// method, only counters for resizes are affected.
   void StatsCountersAdd(HashTableStatsProfile* profile);
 
   /// Inserts the row to the hash table. The caller is responsible for ensuring that the
@@ -669,11 +697,12 @@
   /// only one tuple, a pointer to that tuple is stored. Otherwise the 'flat_row' pointer
   /// is stored. The 'row' is not copied by the hash table and the caller must guarantee
   /// it stays in memory. This will not grow the hash table.
-  bool IR_ALWAYS_INLINE Insert(HashTableCtx* ht_ctx,
+  bool IR_ALWAYS_INLINE Insert(HashTableCtx* __restrict__ ht_ctx,
       BufferedTupleStream::FlatRowPtr flat_row, TupleRow* row,
       Status* status) WARN_UNUSED_RESULT;
 
   /// Prefetch the hash table bucket which the given hash value 'hash' maps to.
+  /// Thread-safe for read-only hash tables.
   template <const bool READ>
   void IR_ALWAYS_INLINE PrefetchBucket(uint32_t hash);
 
@@ -685,26 +714,33 @@
   /// row. The matching rows do not need to be evaluated since all the nodes of a bucket
   /// are duplicates. One scan can be in progress for each 'ht_ctx'. Used in the probe
   /// phase of hash joins.
-  Iterator IR_ALWAYS_INLINE FindProbeRow(HashTableCtx* ht_ctx);
+  /// Thread-safe for read-only hash tables.
+  Iterator IR_ALWAYS_INLINE FindProbeRow(HashTableCtx* __restrict__ ht_ctx);
 
   /// If a match is found in the table, return an iterator as in FindProbeRow(). If a
   /// match was not present, return an iterator pointing to the empty bucket where the key
   /// should be inserted. Returns End() if the table is full. The caller can set the data
   /// in the bucket using a Set*() method on the iterator.
-  Iterator IR_ALWAYS_INLINE FindBuildRowBucket(HashTableCtx* ht_ctx, bool* found);
+  /// Thread-safe for read-only hash tables.
+  Iterator IR_ALWAYS_INLINE FindBuildRowBucket(
+      HashTableCtx* __restrict__ ht_ctx, bool* found);
 
   /// Returns number of elements inserted in the hash table
+  /// Thread-safe for read-only hash tables.
   int64_t size() const {
     return num_filled_buckets_ - num_buckets_with_duplicates_ + num_duplicate_nodes_;
   }
 
   /// Returns the number of empty buckets.
+  /// Thread-safe for read-only hash tables.
   int64_t EmptyBuckets() const { return num_buckets_ - num_filled_buckets_; }
 
   /// Returns the number of buckets
+  /// Thread-safe for read-only hash tables.
   int64_t num_buckets() const { return num_buckets_; }
 
   /// Returns the load factor (the number of non-empty buckets)
+  /// Thread-safe for read-only hash tables.
   double load_factor() const {
     return static_cast<double>(num_filled_buckets_) / num_buckets_;
   }
@@ -726,6 +762,7 @@
 
   /// Returns the memory occupied by the hash table, takes into account the number of
   /// duplicates.
+  /// Thread-safe for read-only hash tables.
   int64_t CurrentMemSize() const;
 
   /// Returns the number of inserts that can be performed before resizing the table.
@@ -738,7 +775,7 @@
   /// inserted without need to resize. If there is not enough memory available to
   /// resize the hash table, Status::OK() is returned and 'got_memory' is false. If a
   /// another error occurs, an error status may be returned.
-  Status CheckAndResize(uint64_t buckets_to_fill, const HashTableCtx* ht_ctx,
+  Status CheckAndResize(uint64_t buckets_to_fill, HashTableCtx* __restrict__ ht_ctx,
       bool* got_memory) WARN_UNUSED_RESULT;
 
   /// Returns the number of bytes allocated to the hash table from the block manager.
@@ -748,23 +785,28 @@
 
   /// Returns an iterator at the beginning of the hash table.  Advancing this iterator
   /// will traverse all elements.
+  /// Thread-safe for read-only hash tables.
   Iterator Begin(const HashTableCtx* ht_ctx);
 
   /// Return an iterator pointing to the first element (Bucket or DuplicateNode, if the
   /// bucket has duplicates) in the hash table that does not have its matched flag set.
   /// Used in right joins and full-outer joins.
+  /// Thread-safe for read-only hash tables.
   Iterator FirstUnmatched(HashTableCtx* ctx);
 
   /// Return true if there was a least one match.
+  /// Thread-safe for read-only hash tables.
   bool HasMatches() const { return has_matches_; }
 
   /// Return end marker.
+  /// Thread-safe for read-only hash tables.
   Iterator End() { return Iterator(); }
 
   /// Dump out the entire hash table to string.  If 'skip_empty', empty buckets are
   /// skipped.  If 'show_match', it also prints the matched flag of each node. If
   /// 'build_desc' is non-null, the build rows will be printed. Otherwise, only the
   /// the addresses of the build rows will be printed.
+  /// Thread-safe for read-only hash tables.
   std::string DebugString(bool skip_empty, bool show_match,
       const RowDescriptor* build_desc);
 
@@ -772,6 +814,7 @@
   void DebugStringTuple(std::stringstream& ss, HtData& htdata, const RowDescriptor* desc);
 
   /// Update and print some statistics that can be used for performance debugging.
+  /// Thread-safe for read-only hash tables.
   std::string PrintStats() const;
 
   /// stl-like iterator interface.
@@ -788,6 +831,7 @@
         node_(NULL) { }
 
     /// Iterates to the next element. It should be called only if !AtEnd().
+    /// Thread-safe for read-only hash tables.
     void IR_ALWAYS_INLINE Next();
 
     /// Iterates to the next duplicate node. If the bucket does not have duplicates or
@@ -795,39 +839,48 @@
     /// Used when we want to iterate over all the duplicate nodes bypassing the Next()
     /// interface (e.g. in semi/outer joins without other_join_conjuncts, in order to
     /// iterate over all nodes of an unmatched bucket).
+    /// Thread-safe for read-only hash tables.
     void IR_ALWAYS_INLINE NextDuplicate();
 
     /// Iterates to the next element that does not have its matched flag set. Used in
     /// right-outer and full-outer joins.
+    /// Thread-safe for read-only hash tables.
     void IR_ALWAYS_INLINE NextUnmatched();
 
     /// Return the current row or tuple. Callers must check the iterator is not AtEnd()
     /// before calling them.  The returned row is owned by the iterator and valid until
     /// the next call to GetRow(). It is safe to advance the iterator.
+    /// Thread-safe for read-only hash tables.
     TupleRow* IR_ALWAYS_INLINE GetRow() const;
     Tuple* IR_ALWAYS_INLINE GetTuple() const;
 
     /// Set the current tuple for an empty bucket. Designed to be used with the iterator
     /// returned from FindBuildRowBucket() in the case when the value is not found.  It is
     /// not valid to call this function if the bucket already has an entry.
+    /// Not thread-safe.
     void SetTuple(Tuple* tuple, uint32_t hash);
 
     /// Sets as matched the Bucket or DuplicateNode currently pointed by the iterator,
     /// depending on whether the bucket has duplicates or not. The iterator cannot be
     /// AtEnd().
+    /// Not thread-safe.
     void SetMatched();
 
     /// Returns the 'matched' flag of the current Bucket or DuplicateNode, depending on
     /// whether the bucket has duplicates or not. It should be called only if !AtEnd().
+    /// Thread-safe for read-only hash tables.
     bool IsMatched() const;
 
     /// Resets everything but the pointer to the hash table.
+    /// Not thread-safe.
     void SetAtEnd();
 
     /// Returns true if this iterator is at the end, i.e. GetRow() cannot be called.
+    /// Thread-safe for read-only hash tables.
     bool ALWAYS_INLINE AtEnd() const { return bucket_idx_ == BUCKET_NOT_FOUND; }
 
     /// Prefetch the hash table bucket which the iterator is pointing to now.
+    /// Thread-safe for read-only hash tables.
     template<const bool READ>
     void IR_ALWAYS_INLINE PrefetchBucket();
 
@@ -868,9 +921,10 @@
 
   /// Performs the probing operation according to the probing algorithm (linear or
   /// quadratic. Returns one of the following:
-  /// (a) the index of the bucket that contains the entry that matches with the last row
-  ///     evaluated in 'ht_ctx'. If 'ht_ctx' is NULL then it does not check for row
-  ///     equality and returns the index of the first empty bucket.
+  /// (a) the index of the bucket that contains the entry matching 'hash' and, if
+  ///     COMPARE_ROW is true, also equals the last row evaluated in 'ht_ctx'.
+  ///     If COMPARE_ROW is false, returns the index of the first bucket with
+  ///     matching hash.
   /// (b) the index of the first empty bucket according to the probing algorithm (linear
   ///     or quadratic), if the entry is not in the hash table or 'ht_ctx' is NULL.
   /// (c) Iterator::BUCKET_NOT_FOUND if the probe was not successful, i.e. the maximum
@@ -889,15 +943,16 @@
   /// 'found' indicates that a bucket that contains an equal row is found.
   ///
   /// There are wrappers of this function that perform the Find and Insert logic.
-  template <bool INCLUSIVE_EQUALITY>
+  template <bool INCLUSIVE_EQUALITY, bool COMPARE_ROW>
   int64_t IR_ALWAYS_INLINE Probe(Bucket* buckets, int64_t num_buckets,
-      HashTableCtx* ht_ctx, uint32_t hash, bool* found);
+      HashTableCtx* __restrict__ ht_ctx, uint32_t hash, bool* found);
 
   /// Performs the insert logic. Returns the HtData* of the bucket or duplicate node
   /// where the data should be inserted. Returns NULL if the insert was not successful
   /// and either sets 'status' to OK if it failed because not enough reservation was
   /// available or the error if an error was encountered.
-  HtData* IR_ALWAYS_INLINE InsertInternal(HashTableCtx* ht_ctx, Status* status);
+  HtData* IR_ALWAYS_INLINE InsertInternal(
+      HashTableCtx* __restrict__ ht_ctx, Status* status);
 
   /// Updates 'bucket_idx' to the index of the next non-empty bucket. If the bucket has
   /// duplicates, 'node' will be pointing to the head of the linked list of duplicates.
@@ -906,7 +961,8 @@
   void NextFilledBucket(int64_t* bucket_idx, DuplicateNode** node);
 
   /// Resize the hash table to 'num_buckets'. 'got_memory' is false on OOM.
-  Status ResizeBuckets(int64_t num_buckets, const HashTableCtx* ht_ctx, bool* got_memory);
+  Status ResizeBuckets(
+      int64_t num_buckets, HashTableCtx* __restrict__ ht_ctx, bool* got_memory);
 
   /// Appends the DuplicateNode pointed by next_node_ to 'bucket' and moves the next_node_
   /// pointer to the next DuplicateNode in the page, updating the remaining node counter.
@@ -1016,19 +1072,6 @@
   /// (IMPALA-1488).
   bool has_matches_ = false;
 
-  /// The stats below can be used for debugging perf.
-  /// Number of Probe() calls that probe the hash table.
-  int64_t num_probes_ = 0;
-
-  /// Total distance traveled for each probe. That is the sum of the diff between the end
-  /// position of a probe (find/insert) and its start position
-  /// (hash & (num_buckets_ - 1)).
-  int64_t travel_length_ = 0;
-
-  /// The number of cases where we had to compare buckets with the same hash value, but
-  /// the row equality failed.
-  int64_t num_hash_collisions_ = 0;
-
   /// How many times this table has resized so far.
   int64_t num_resizes_ = 0;
 };
diff --git a/be/src/exec/hash-table.inline.h b/be/src/exec/hash-table.inline.h
index 2bcecef..74f2470 100644
--- a/be/src/exec/hash-table.inline.h
+++ b/be/src/exec/hash-table.inline.h
@@ -48,13 +48,14 @@
   DCHECK_LE(cur_expr_values_hash_ - expr_values_hash_array_.get(), capacity_);
 }
 
-template <bool INCLUSIVE_EQUALITY>
+template <bool INCLUSIVE_EQUALITY, bool COMPARE_ROW>
 inline int64_t HashTable::Probe(Bucket* buckets, int64_t num_buckets,
-    HashTableCtx* ht_ctx, uint32_t hash, bool* found) {
-  DCHECK(buckets != NULL);
+    HashTableCtx* __restrict__ ht_ctx, uint32_t hash, bool* found) {
+  DCHECK(ht_ctx != nullptr);
+  DCHECK(buckets != nullptr);
   DCHECK_GT(num_buckets, 0);
   *found = false;
-  ++num_probes_;
+  ++ht_ctx->num_probes_;
   int64_t bucket_idx = hash & (num_buckets - 1);
 
   // In case of linear probing it counts the total number of steps for statistics and
@@ -65,18 +66,18 @@
     Bucket* bucket = &buckets[bucket_idx];
     if (LIKELY(!bucket->filled)) return bucket_idx;
     if (hash == bucket->hash) {
-      if (ht_ctx != NULL &&
-          ht_ctx->Equals<INCLUSIVE_EQUALITY>(GetRow(bucket, ht_ctx->scratch_row_))) {
+      if (COMPARE_ROW
+          && ht_ctx->Equals<INCLUSIVE_EQUALITY>(GetRow(bucket, ht_ctx->scratch_row_))) {
         *found = true;
         return bucket_idx;
       }
       // Row equality failed, or not performed. This is a hash collision. Continue
       // searching.
-      ++num_hash_collisions_;
+      ++ht_ctx->num_hash_collisions_;
     }
     // Move to the next bucket.
     ++step;
-    ++travel_length_;
+    ++ht_ctx->travel_length_;
     if (quadratic_probing()) {
       // The i-th probe location is idx = (hash + (step * (step + 1)) / 2) mod num_buckets.
       // This gives num_buckets unique idxs (between 0 and N-1) when num_buckets is a power
@@ -86,16 +87,17 @@
       bucket_idx = (bucket_idx + 1) & (num_buckets - 1);
     }
   } while (LIKELY(step < num_buckets));
+
   DCHECK_EQ(num_filled_buckets_, num_buckets) << "Probing of a non-full table "
       << "failed: " << quadratic_probing() << " " << hash;
   return Iterator::BUCKET_NOT_FOUND;
 }
 
 inline HashTable::HtData* HashTable::InsertInternal(
-    HashTableCtx* ht_ctx, Status* status) {
+    HashTableCtx* __restrict__ ht_ctx, Status* status) {
   bool found = false;
   uint32_t hash = ht_ctx->expr_values_cache()->CurExprValuesHash();
-  int64_t bucket_idx = Probe<true>(buckets_, num_buckets_, ht_ctx, hash, &found);
+  int64_t bucket_idx = Probe<true, true>(buckets_, num_buckets_, ht_ctx, hash, &found);
   DCHECK_NE(bucket_idx, Iterator::BUCKET_NOT_FOUND);
   if (found) {
     // We need to insert a duplicate node, note that this may fail to allocate memory.
@@ -108,7 +110,7 @@
   }
 }
 
-inline bool HashTable::Insert(HashTableCtx* ht_ctx,
+inline bool HashTable::Insert(HashTableCtx* __restrict__ ht_ctx,
     BufferedTupleStream::FlatRowPtr flat_row, TupleRow* row, Status* status) {
   HtData* htdata = InsertInternal(ht_ctx, status);
   // If successful insert, update the contents of the newly inserted entry with 'idx'.
@@ -134,10 +136,10 @@
   __builtin_prefetch(&buckets_[bucket_idx], READ ? 0 : 1, 1);
 }
 
-inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* ht_ctx) {
+inline HashTable::Iterator HashTable::FindProbeRow(HashTableCtx* __restrict__ ht_ctx) {
   bool found = false;
   uint32_t hash = ht_ctx->expr_values_cache()->CurExprValuesHash();
-  int64_t bucket_idx = Probe<false>(buckets_, num_buckets_, ht_ctx, hash, &found);
+  int64_t bucket_idx = Probe<false, true>(buckets_, num_buckets_, ht_ctx, hash, &found);
   if (found) {
     return Iterator(this, ht_ctx->scratch_row(), bucket_idx,
         stores_duplicates() ? buckets_[bucket_idx].bucketData.duplicates : NULL);
@@ -147,9 +149,9 @@
 
 // TODO: support lazy evaluation like HashTable::Insert().
 inline HashTable::Iterator HashTable::FindBuildRowBucket(
-    HashTableCtx* ht_ctx, bool* found) {
+    HashTableCtx* __restrict__ ht_ctx, bool* found) {
   uint32_t hash = ht_ctx->expr_values_cache()->CurExprValuesHash();
-  int64_t bucket_idx = Probe<true>(buckets_, num_buckets_, ht_ctx, hash, found);
+  int64_t bucket_idx = Probe<true, true>(buckets_, num_buckets_, ht_ctx, hash, found);
   DuplicateNode* duplicates = NULL;
   if (stores_duplicates() && LIKELY(bucket_idx != Iterator::BUCKET_NOT_FOUND)) {
     duplicates = buckets_[bucket_idx].bucketData.duplicates;
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 6df580d..c6752d6 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -288,8 +288,11 @@
 void PhjBuilder::Close(RuntimeState* state) {
   if (closed_) return;
   CloseAndDeletePartitions(nullptr);
-  if (ht_ctx_ != nullptr) ht_ctx_->Close(state);
-  ht_ctx_.reset();
+  if (ht_ctx_ != nullptr) {
+    ht_ctx_->StatsCountersAdd(ht_stats_profile_.get());
+    ht_ctx_->Close(state);
+    ht_ctx_.reset();
+  }
   for (const FilterContext& ctx : filter_ctxs_) {
     if (ctx.expr_eval != nullptr) ctx.expr_eval->Close(state);
   }
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 0dec0b8..fc8d075 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -316,6 +316,10 @@
   /// TODO: IMPALA-9176: improve the encapsulation of the null-aware partition.
   inline Partition* null_aware_partition() const { return null_aware_partition_.get(); }
 
+  /// TODO: IMPALA-9156: document thread safety for accessing this from
+  /// multiple PartitionedHashJoinNodes.
+  HashTableStatsProfile* ht_stats_profile() const { return ht_stats_profile_.get(); }
+
   std::string DebugString() const;
 
   /// A partition containing a subset of build rows.
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index a9fe4e9..32b4cd5 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -253,8 +253,11 @@
 
 void PartitionedHashJoinNode::Close(RuntimeState* state) {
   if (is_closed()) return;
-  if (ht_ctx_ != nullptr) ht_ctx_->Close(state);
-  ht_ctx_.reset();
+  if (ht_ctx_ != nullptr) {
+    ht_ctx_->StatsCountersAdd(builder_->ht_stats_profile());
+    ht_ctx_->Close(state);
+    ht_ctx_.reset();
+  }
   output_unmatched_batch_.reset();
   output_unmatched_batch_iter_.reset();
   CloseAndDeletePartitions(nullptr);