IMPALA-9126: part 4: hash join builder manages spilling
This is the final patch for IMPALA-9126.
This will allow the many:1 relationship of probe:build
partitions that we need for the shared join build.
Key changes:
* Builder picks the next spilled partition to process.
* Partitions are identified by unique ID so can be
decoupled between build and probe.
* unique_ptr is used to manage build partitions. This
helps document the lifecycle of the partitions better,
particularly when they are handed off to
PartitionedHashJoinNode.
Testing:
* Ran exhaustive tests.
* Ran a single node TPC-H and TPC-DS stress test with 1000 queries.
Perf:
Ran a single node TPC-H 30 test against master from
before IMPALA-9126 changes. No significant perf
change.
Change-Id: I6de5f62e3eacf80f72c8ea0ed8cba012f0f53c90
Reviewed-on: http://gerrit.cloudera.org:8080/14790
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
diff --git a/be/src/exec/partitioned-hash-join-builder-ir.cc b/be/src/exec/partitioned-hash-join-builder-ir.cc
index 8d8e42d..a4bfcdb 100644
--- a/be/src/exec/partitioned-hash-join-builder-ir.cc
+++ b/be/src/exec/partitioned-hash-join-builder-ir.cc
@@ -61,7 +61,7 @@
}
const uint32_t hash = expr_vals_cache->CurExprValuesHash();
const uint32_t partition_idx = hash >> (32 - NUM_PARTITIONING_BITS);
- Partition* partition = hash_partitions_[partition_idx];
+ Partition* partition = hash_partitions_[partition_idx].get();
if (UNLIKELY(!AppendRow(partition->build_rows(), build_row, &status))) {
return status;
}
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 8c1023a..a865e8f 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -187,13 +187,13 @@
Status PhjBuilder::FlushFinal(RuntimeState* state) {
int64_t num_build_rows = 0;
- for (Partition* partition : hash_partitions_) {
+ for (const unique_ptr<Partition>& partition : hash_partitions_) {
num_build_rows += partition->build_rows()->num_rows();
}
if (num_build_rows > 0) {
double largest_fraction = 0.0;
- for (Partition* partition : hash_partitions_) {
+ for (const unique_ptr<Partition>& partition : hash_partitions_) {
largest_fraction = max(largest_fraction,
partition->build_rows()->num_rows() / static_cast<double>(num_build_rows));
}
@@ -205,7 +205,7 @@
ss << Substitute("PHJ(node_id=$0) partitioned(level=$1) $2 rows into:", join_node_id_,
hash_partitions_[0]->level(), num_build_rows);
for (int i = 0; i < hash_partitions_.size(); ++i) {
- Partition* partition = hash_partitions_[i];
+ Partition* partition = hash_partitions_[i].get();
double percent = num_build_rows == 0 ? 0.0 : partition->build_rows()->num_rows()
* 100 / static_cast<double>(num_build_rows);
ss << " " << i << " " << (partition->is_spilled() ? "spilled" : "not spilled")
@@ -260,6 +260,7 @@
state_ = HashJoinState::PARTITIONING_BUILD;
expr_results_pool_->Clear();
non_empty_build_ = false;
+ next_partition_id_ = 0;
CloseAndDeletePartitions(row_batch);
}
@@ -303,24 +304,30 @@
return "";
}
-Status PhjBuilder::CreateAndPreparePartition(int level, Partition** partition) {
- all_partitions_.emplace_back(new Partition(runtime_state_, this, level));
- *partition = all_partitions_.back().get();
- RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_label_, true));
+Status PhjBuilder::CreateAndPreparePartition(
+ int level, unique_ptr<Partition>* partition) {
+ *partition = make_unique<Partition>(runtime_state_, this, level);
+ Status status = (*partition)->build_rows()->Init(join_node_label_, true);
+ if (!status.ok()) goto error;
bool got_buffer;
- RETURN_IF_ERROR((*partition)->build_rows()->PrepareForWrite(&got_buffer));
+ status = (*partition)->build_rows()->PrepareForWrite(&got_buffer);
+ if (!status.ok()) goto error;
DCHECK(got_buffer)
<< "Accounted in min reservation" << buffer_pool_client_->DebugString();
return Status::OK();
+ error:
+ (*partition)->Close(nullptr);
+ partition->reset();
+ return status;
}
Status PhjBuilder::CreateHashPartitions(int level) {
DCHECK(hash_partitions_.empty());
ht_ctx_->set_level(level); // Set the hash function for partitioning input.
for (int i = 0; i < PARTITION_FANOUT; ++i) {
- Partition* new_partition;
+ unique_ptr<Partition> new_partition;
RETURN_IF_ERROR(CreateAndPreparePartition(level, &new_partition));
- hash_partitions_.push_back(new_partition);
+ hash_partitions_.push_back(std::move(new_partition));
}
COUNTER_ADD(partitions_created_, PARTITION_FANOUT);
COUNTER_SET(max_partition_level_, level);
@@ -348,11 +355,11 @@
Partition* best_candidate = nullptr;
if (null_aware_partition_ != nullptr && null_aware_partition_->CanSpill()) {
// Spill null-aware partition first if possible - it is always processed last.
- best_candidate = null_aware_partition_;
+ best_candidate = null_aware_partition_.get();
} else {
// Iterate over the partitions and pick the largest partition to spill.
int64_t max_freed_mem = 0;
- for (Partition* candidate : hash_partitions_) {
+ for (const unique_ptr<Partition>& candidate : hash_partitions_) {
if (!candidate->CanSpill()) continue;
int64_t mem = candidate->build_rows()->BytesPinned(false);
if (candidate->hash_tbl() != nullptr) {
@@ -363,7 +370,7 @@
}
if (mem > max_freed_mem) {
max_freed_mem = mem;
- best_candidate = candidate;
+ best_candidate = candidate.get();
}
}
}
@@ -400,7 +407,7 @@
DCHECK_EQ(PARTITION_FANOUT, hash_partitions_.size());
for (int i = 0; i < PARTITION_FANOUT; ++i) {
- Partition* partition = hash_partitions_[i];
+ Partition* partition = hash_partitions_[i].get();
if (partition->build_rows()->num_rows() == 0) {
// This partition is empty, no need to do anything else.
partition->Close(nullptr);
@@ -424,7 +431,7 @@
RETURN_IF_ERROR(ReserveProbeBuffers(input_is_spilled));
for (int i = 0; i < PARTITION_FANOUT; ++i) {
- Partition* partition = hash_partitions_[i];
+ Partition* partition = hash_partitions_[i].get();
if (partition->IsClosed() || partition->is_spilled()) continue;
bool built = false;
@@ -458,7 +465,7 @@
RETURN_IF_ERROR(SpillPartition(
BufferedTupleStream::UNPIN_ALL, &spilled_partition));
// Don't need to create a probe stream for the null-aware partition.
- if (spilled_partition != null_aware_partition_) {
+ if (spilled_partition != null_aware_partition_.get()) {
addtl_reservation += per_stream_reservation;
}
}
@@ -487,33 +494,44 @@
probe_client->RestoreReservation(&probe_stream_reservation_, saved_reservation);
}
-int PhjBuilder::GetNumSpilledPartitions(const vector<Partition*>& partitions) {
+int PhjBuilder::GetNumSpilledPartitions(const vector<unique_ptr<Partition>>& partitions) {
int num_spilled = 0;
for (int i = 0; i < partitions.size(); ++i) {
- Partition* partition = partitions[i];
+ Partition* partition = partitions[i].get();
DCHECK(partition != nullptr);
if (!partition->IsClosed() && partition->is_spilled()) ++num_spilled;
}
return num_spilled;
}
-void PhjBuilder::DoneProbingHashPartitions(const bool retain_partition[PARTITION_FANOUT],
- list<Partition*>* output_partitions, RowBatch* batch) {
+void PhjBuilder::DoneProbingHashPartitions(
+ const int64_t num_spilled_probe_rows[PARTITION_FANOUT],
+ deque<unique_ptr<Partition>>* output_partitions, RowBatch* batch) {
DCHECK(output_partitions->empty());
+ if (state_ == HashJoinState::REPARTITIONING_PROBE) {
+ // Finished repartitioning this partition. Discard before pushing more spilled
+ // partitions onto 'spilled_partitions_'.
+ DCHECK(!spilled_partitions_.empty());
+ spilled_partitions_.pop_back();
+ }
+
for (int i = 0; i < PARTITION_FANOUT; ++i) {
- PhjBuilder::Partition* partition = hash_partitions_[i];
+ unique_ptr<PhjBuilder::Partition> partition = std::move(hash_partitions_[i]);
if (partition->IsClosed()) continue;
+ partition->IncrementNumSpilledProbeRows(num_spilled_probe_rows[i]);
if (partition->is_spilled()) {
DCHECK(partition->hash_tbl() == nullptr) << DebugString();
DCHECK_EQ(partition->build_rows()->BytesPinned(false), 0)
<< "Build was fully unpinned in BuildHashTablesAndPrepareProbeStreams()";
- // Release resources associated with completed partitions.
- if (!retain_partition[i]) {
+ if (partition->num_spilled_probe_rows() == 0
+ && !NeedToProcessUnmatchedBuildRows(join_op_)) {
COUNTER_ADD(num_hash_table_builds_skipped_, 1);
partition->Close(nullptr);
+ } else {
+ spilled_partitions_.push_back(std::move(partition));
}
} else if (NeedToProcessUnmatchedBuildRows(join_op_)) {
- output_partitions->push_back(partition);
+ output_partitions->push_back(std::move(partition));
} else {
// No more processing is required for this partition.
partition->Close(batch);
@@ -523,24 +541,30 @@
}
void PhjBuilder::DoneProbingSinglePartition(
- Partition* partition, std::list<Partition*>* output_partitions, RowBatch* batch) {
+ deque<unique_ptr<Partition>>* output_partitions, RowBatch* batch) {
if (NeedToProcessUnmatchedBuildRows(join_op_)) {
// If the build partition was in memory, we are done probing this partition.
// In case of right-outer, right-anti and full-outer joins, we move this partition
// to the list of partitions that we need to output their unmatched build rows.
- output_partitions->push_back(partition);
+ output_partitions->push_back(std::move(spilled_partitions_.back()));
} else {
// In any other case, just close the input build partition.
- partition->Close(IsLeftSemiJoin(join_op_) ? nullptr : batch);
+ spilled_partitions_.back()->Close(IsLeftSemiJoin(join_op_) ? nullptr : batch);
}
+ spilled_partitions_.pop_back();
}
void PhjBuilder::CloseAndDeletePartitions(RowBatch* row_batch) {
// Close all the partitions and clean up all references to them.
- for (unique_ptr<Partition>& partition : all_partitions_) partition->Close(row_batch);
- all_partitions_.clear();
+ for (unique_ptr<Partition>& partition : hash_partitions_) {
+ partition->Close(row_batch);
+ }
hash_partitions_.clear();
- null_aware_partition_ = nullptr;
+ for (unique_ptr<Partition>& partition : spilled_partitions_) {
+ partition->Close(row_batch);
+ }
+ spilled_partitions_.clear();
+ CloseNullAwarePartition();
}
void PhjBuilder::AllocateRuntimeFilters() {
@@ -609,13 +633,20 @@
}
}
-Status PhjBuilder::BeginSpilledProbe(bool empty_probe, Partition* partition,
- BufferPool::ClientHandle* probe_client, bool* repartitioned, int* level,
- HashPartitions* new_partitions) {
- DCHECK(partition->is_spilled());
+Status PhjBuilder::BeginSpilledProbe(
+ BufferPool::ClientHandle* probe_client, bool* repartitioned,
+ Partition** input_partition, HashPartitions* new_partitions) {
+ DCHECK(!spilled_partitions_.empty());
DCHECK_EQ(0, hash_partitions_.size());
+ // Pick the next spilled partition to process. The partition will stay in
+ // 'spilled_partitions_' until we are done probing it or repartitioning its probe.
+ // Thus it will remain valid as long as it's needed and always get cleaned up in
+ // Close(), even if an error occurs.
+ Partition* partition = spilled_partitions_.back().get();
+ *input_partition = partition;
+ DCHECK(partition->is_spilled()) << partition->DebugString();
- if (empty_probe) {
+ if (partition->num_spilled_probe_rows() == 0) {
// If there are no probe rows, there's no need to build the hash table, and
// only partitions with NeedToProcessUnmatcheBuildRows() will have been added
// to 'spilled_partitions_' in DoneProbingHashPartitions().
@@ -629,7 +660,6 @@
COUNTER_ADD(num_hash_table_builds_skipped_, 1);
UpdateState(HashJoinState::PROBING_SPILLED_PARTITION);
*repartitioned = false;
- *level = partition->level();
return Status::OK();
}
@@ -645,7 +675,6 @@
TransferProbeStreamReservation(probe_client);
UpdateState(HashJoinState::PROBING_SPILLED_PARTITION);
*repartitioned = false;
- *level = partition->level();
return Status::OK();
}
// This build partition still does not fit in memory, repartition.
@@ -679,7 +708,6 @@
}
TransferProbeStreamReservation(probe_client);
*repartitioned = true;
- *level = ht_ctx_->level();
*new_partitions = HashPartitions(ht_ctx_->level(), &hash_partitions_, non_empty_build_);
return Status::OK();
}
@@ -723,7 +751,7 @@
int64_t PhjBuilder::LargestPartitionRows() const {
int64_t max_rows = 0;
for (int i = 0; i < hash_partitions_.size(); ++i) {
- Partition* partition = hash_partitions_[i];
+ Partition* partition = hash_partitions_[i].get();
DCHECK(partition != nullptr);
if (partition->IsClosed()) continue;
int64_t rows = partition->build_rows()->num_rows();
@@ -740,7 +768,10 @@
}
PhjBuilder::Partition::Partition(RuntimeState* state, PhjBuilder* parent, int level)
- : parent_(parent), is_spilled_(false), level_(level) {
+ : parent_(parent),
+ id_(parent->next_partition_id_++),
+ is_spilled_(false),
+ level_(level) {
build_rows_ = make_unique<BufferedTupleStream>(state, parent_->row_desc_,
parent_->buffer_pool_client_, parent->spillable_buffer_size_,
parent->max_row_buffer_size_);
@@ -875,7 +906,7 @@
std::string PhjBuilder::Partition::DebugString() {
stringstream ss;
- ss << "<Partition>: ptr=" << this;
+ ss << "<Partition>: ptr=" << this << " id=" << id_;
if (IsClosed()) {
ss << " Closed";
return ss.str();
@@ -891,6 +922,7 @@
if (hash_tbl_ != nullptr) {
ss << " Hash Table Rows: " << hash_tbl_->size();
}
+ ss << " Spilled Probe Rows: " << num_spilled_probe_rows_ << endl;
return ss.str();
}
@@ -935,6 +967,11 @@
for (int i = 0; i < hash_partitions_.size(); ++i) {
ss << " Hash partition " << i << " " << hash_partitions_[i]->DebugString() << endl;
}
+ ss << " Spilled partitions: " << spilled_partitions_.size() << ":" << endl;
+ for (int i = 0; i < spilled_partitions_.size(); ++i) {
+ ss << " Spilled partition " << i << " "
+ << spilled_partitions_[i]->DebugString() << endl;
+ }
if (null_aware_partition_ != nullptr) {
ss << "Null-aware partition: " << null_aware_partition_->DebugString();
}
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 6e614e8..9ac23b1 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -18,10 +18,10 @@
#ifndef IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H
#define IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H
-#include <boost/scoped_ptr.hpp>
+#include <deque>
#include <memory>
-#include <list>
#include <vector>
+#include <boost/scoped_ptr.hpp>
#include "common/object-pool.h"
#include "common/status.h"
@@ -112,6 +112,8 @@
class Partition;
+ using PartitionId = int;
+
PhjBuilder(int join_node_id, const std::string& join_node_label, TJoinOp::type join_op,
const RowDescriptor* build_row_desc, RuntimeState* state,
BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
@@ -143,8 +145,9 @@
/// Represents a set of hash partitions to be handed off to the probe side.
struct HashPartitions {
HashPartitions() { Reset(); }
- HashPartitions(
- int level, const std::vector<Partition*>* hash_partitions, bool non_empty_build)
+ HashPartitions(int level,
+ const std::vector<std::unique_ptr<Partition>>* hash_partitions,
+ bool non_empty_build)
: level(level),
hash_partitions(hash_partitions),
non_empty_build(non_empty_build) {}
@@ -162,7 +165,7 @@
// The current set of hash partitions. Always contains PARTITION_FANOUT partitions.
// The partitions may be in-memory, spilled, or closed. Valid until
// DoneProbingHashPartitions() is called.
- const std::vector<Partition*>* hash_partitions;
+ const std::vector<std::unique_ptr<Partition>>* hash_partitions;
// True iff the build side had at least one row in a partition.
bool non_empty_build;
@@ -176,9 +179,10 @@
/// TODO: IMPALA-9156: this will be a synchronization point for shared join build.
HashPartitions BeginInitialProbe(BufferPool::ClientHandle* probe_client);
- /// Prepare to process the probe side of 'partition', either by building a hash
- /// table over 'partition', or if does not fit in memory, by repartitioning into
- /// PARTITION_FANOUT new partitions.
+ /// Pick a spilled partition to process (returned in *input_partition) and
+ /// prepare to probe it. Builds a hash table over *input_partition
+ /// if it fits in memory. Otherwise repartition it into PARTITION_FANOUT
+ /// new partitions.
///
/// When this function returns successfully, 'probe_client' will have enough
/// reservation for a read buffer for the input probe stream and, if repartitioning,
@@ -189,24 +193,24 @@
/// previous hash partitions must have been cleared with DoneProbingHashPartitions().
/// The new hash partitions are returned in 'new_partitions'.
/// TODO: IMPALA-9156: this will be a synchronization point for shared join build.
- Status BeginSpilledProbe(bool empty_probe, Partition* partition,
- BufferPool::ClientHandle* probe_client, bool* repartitioned, int* level,
- HashPartitions* new_partitions);
+ Status BeginSpilledProbe(BufferPool::ClientHandle* probe_client, bool* repartitioned,
+ Partition** input_partition, HashPartitions* new_partitions);
/// Called after probing of the hash partitions returned by BeginInitialProbe() or
- /// BeginSpilledProbe() (when *repartitioning as true) is complete,
- /// i.e. all of the corresponding probe rows have been processed by
- /// PartitionedHashJoinNode. Appends in-memory partitions that may contain build
- /// rows to output to 'output_partitions' for build modes like right outer join
- /// that output unmatched rows. Close other in-memory partitions, attaching any
- /// tuple data to 'batch' if 'batch' is non-NULL. Closes spilled partitions if
- /// 'retain_spilled_partition' is false for that partition index.
+ /// BeginSpilledProbe() (when *repartitioned as true) is complete, i.e. all of the
+ /// corresponding probe rows have been processed by PartitionedHashJoinNode. The number
+ /// of spilled probe rows per partition must be passed in via 'num_spilled_probe_rows'
+ /// so that the builder can determine whether a spilled partition needs to be retained.
+ /// Appends in-memory partitions that may contain build rows to output to
+ /// 'output_partitions' for build modes like right outer join that output unmatched
+ /// rows. Close other in-memory partitions, attaching any tuple data to 'batch' if
+ /// 'batch' is non-NULL. Closes spilled partitions if no more processing is needed.
/// TODO: IMPALA-9156: this will be a synchronization point for shared join build.
- void DoneProbingHashPartitions(const bool retain_spilled_partition[PARTITION_FANOUT],
- std::list<Partition*>* output_partitions, RowBatch* batch);
+ void DoneProbingHashPartitions(const int64_t num_spilled_probe_rows[PARTITION_FANOUT],
+ std::deque<std::unique_ptr<Partition>>* output_partitions, RowBatch* batch);
/// Called after probing of a single spilled partition returned by
- /// BeginSpilledProbe() when *repartitioning is false.
+ /// BeginSpilledProbe() when *repartitioned is false.
///
/// If the join op requires outputting unmatched build rows and the partition
/// may have build rows to return, it is appended to 'output_partitions'. Partitions
@@ -218,7 +222,7 @@
/// tuple data to 'batch' if 'batch' is non-NULL.
/// TODO: IMPALA-9156: this will be a synchronization point for shared join build.
void DoneProbingSinglePartition(
- Partition* partition, std::list<Partition*>* output_partitions, RowBatch* batch);
+ std::deque<std::unique_ptr<Partition>>* output_partitions, RowBatch* batch);
/// Close the null aware partition (if there is one) and set it to NULL.
/// TODO: IMPALA-9176: improve the encapsulation of the null-aware partition.
@@ -228,7 +232,7 @@
// from the probe side - i.e. the RowDescriptor for PartitionedHashJoinNode does
// not include the build tuple.
null_aware_partition_->Close(nullptr);
- null_aware_partition_ = nullptr;
+ null_aware_partition_.reset();
}
}
@@ -250,7 +254,7 @@
/// Accessor to allow PartitionedHashJoinNode to access null_aware_partition_.
/// TODO: IMPALA-9176: improve the encapsulation of the null-aware partition.
- inline Partition* null_aware_partition() const { return null_aware_partition_; }
+ inline Partition* null_aware_partition() const { return null_aware_partition_.get(); }
std::string DebugString() const;
@@ -304,10 +308,13 @@
bool ALWAYS_INLINE IsClosed() const { return build_rows_ == nullptr; }
BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); }
HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); }
+ PartitionId id() const { return id_; }
bool ALWAYS_INLINE is_spilled() const { return is_spilled_; }
int ALWAYS_INLINE level() const { return level_; }
/// Return true if the partition can be spilled - is not closed and is not spilled.
bool CanSpill() const { return !IsClosed() && !is_spilled(); }
+ int64_t num_spilled_probe_rows() const { return num_spilled_probe_rows_; }
+ void IncrementNumSpilledProbeRows(int64_t count) { num_spilled_probe_rows_ += count; }
private:
/// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'flat_rows' is an array
@@ -325,6 +332,9 @@
const PhjBuilder* parent_;
+ /// Id for this partition that is unique within the builder.
+ const PartitionId id_;
+
/// True if this partition is spilled.
bool is_spilled_;
@@ -340,6 +350,10 @@
/// transferred to the parent exec node (via the row batch) when the partition
/// is closed. If NULL, ownership has been transferred and the partition is closed.
std::unique_ptr<BufferedTupleStream> build_rows_;
+
+ /// The number of spilled probe rows associated with this partition. Updated in
+ /// DoneProbingHashPartitions().
+ int64_t num_spilled_probe_rows_ = 0;
};
/// Computes the minimum reservation required to execute the spilling partitioned
@@ -374,8 +388,9 @@
/// After calling this, batches are added to the new partitions by calling Send().
Status CreateHashPartitions(int level) WARN_UNUSED_RESULT;
- /// Create a new partition in 'all_partitions_' and prepare it for writing.
- Status CreateAndPreparePartition(int level, Partition** partition) WARN_UNUSED_RESULT;
+ /// Create a new partition and prepare it for writing. Returns an error if initializing
+ /// the partition or allocating the write buffer fails.
+ Status CreateAndPreparePartition(int level, std::unique_ptr<Partition>* partition);
/// Reads the rows in build_batch and partitions them into hash_partitions_. If
/// 'build_filters' is true, runtime filters are populated. 'is_null_aware' is
@@ -432,7 +447,8 @@
Status ReserveProbeBuffers(bool input_is_spilled) WARN_UNUSED_RESULT;
/// Returns the number of partitions in 'partitions' that are spilled.
- static int GetNumSpilledPartitions(const std::vector<Partition*>& partitions);
+ static int GetNumSpilledPartitions(
+ const std::vector<std::unique_ptr<Partition>>& partitions);
/// Transfer reservation for probe streams to 'probe_client'. Memory for one stream was
/// reserved per spilled partition in FlushFinal(), plus the input stream if the input
@@ -488,8 +504,6 @@
RuntimeState* const runtime_state_;
/// The ID of the plan join node this is associated with.
- /// TODO: we may want to replace this with a sink ID once we progress further with
- /// multithreading.
const int join_node_id_;
/// The label of the plan join node this is associated with.
@@ -583,23 +597,34 @@
/// Set in FlushFinal() and not modified until Reset().
bool non_empty_build_ = false;
- /// Vector that owns all of the Partition objects.
- std::vector<std::unique_ptr<Partition>> all_partitions_;
+ /// Id to assign to the next partition created.
+ PartitionId next_partition_id_ = 0;
/// The current set of partitions that are being built or probed. This vector is
/// initialized before partitioning or re-partitioning the build input
/// and cleared after we've finished probing the partitions.
/// This is not used when processing a single spilled partition.
- std::vector<Partition*> hash_partitions_;
+ std::vector<std::unique_ptr<Partition>> hash_partitions_;
+
+ /// Spilled partitions that need further processing. Populated in
+ /// DoneProbingHashPartitions() with the spilled hash partitions.
+ ///
+ /// This is used as a stack to do a depth-first walk of spilled partitions (i.e. more
+ /// finely partitioned partitions are processed first). This allows us to delete spilled
+ /// data and bottom out the recursion earlier.
+ ///
+ /// spilled_partitions_.back() is the spilled partition being processed, if one is
+ /// currently being processed (i.e. between BeginSpilledProbe() and the corresponding
+ /// DoneProbing*() call).
+ std::vector<std::unique_ptr<PhjBuilder::Partition>> spilled_partitions_;
/// Partition used for null-aware joins. This partition is always processed at the end
/// after all build and probe rows are processed. In this partition's 'build_rows_', we
/// store all the rows for which 'build_expr_evals_' evaluated over the row returns
/// NULL (i.e. it has a NULL on the eq join slot).
/// NULL if the join is not null aware or we are done processing this partition.
- /// Always NULL once we are done processing the level 0 partitions.
/// This partition starts off in memory but can be spilled.
- Partition* null_aware_partition_ = nullptr;
+ std::unique_ptr<Partition> null_aware_partition_;
/// Populated during the hash table building phase if any partitions spilled.
/// Reservation for one probe stream write buffer per spilled partition is
diff --git a/be/src/exec/partitioned-hash-join-node-ir.cc b/be/src/exec/partitioned-hash-join-node-ir.cc
index 5038189..81052d1 100644
--- a/be/src/exec/partitioned-hash-join-node-ir.cc
+++ b/be/src/exec/partitioned-hash-join-node-ir.cc
@@ -304,7 +304,7 @@
} else {
// The build partition is either empty or spilled.
PhjBuilder::Partition* build_partition =
- (*build_hash_partitions_.hash_partitions)[partition_idx];
+ (*build_hash_partitions_.hash_partitions)[partition_idx].get();
ProbePartition* probe_partition = probe_hash_partitions_[partition_idx].get();
DCHECK((build_partition->IsClosed() && probe_partition == NULL)
|| (build_partition->is_spilled() && probe_partition != NULL));
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index cd66e8c..763111d 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -215,9 +215,7 @@
if (partition != NULL) partition->Close(row_batch);
}
probe_hash_partitions_.clear();
- for (unique_ptr<ProbePartition>& partition : spilled_partitions_) {
- partition->Close(row_batch);
- }
+ for (auto& entry : spilled_partitions_) entry.second->Close(row_batch);
spilled_partitions_.clear();
if (input_partition_ != NULL) {
input_partition_->Close(row_batch);
@@ -227,7 +225,7 @@
null_aware_probe_partition_->Close(row_batch);
null_aware_probe_partition_.reset();
}
- for (PhjBuilder::Partition* partition : output_build_partitions_) {
+ for (unique_ptr<PhjBuilder::Partition>& partition : output_build_partitions_) {
partition->Close(row_batch);
}
output_build_partitions_.clear();
@@ -376,21 +374,26 @@
DCHECK(probe_hash_partitions_.empty());
DCHECK(!spilled_partitions_.empty());
- // TODO: the builder should choose the spilled partition to process.
- input_partition_ = std::move(spilled_partitions_.front());
- spilled_partitions_.pop_front();
- PhjBuilder::Partition* build_partition = input_partition_->build_partition();
- DCHECK(build_partition->is_spilled());
+ PhjBuilder::Partition* build_input_partition;
+ bool repartitioned;
+ RETURN_IF_ERROR(builder_->BeginSpilledProbe(buffer_pool_client(), &repartitioned,
+ &build_input_partition, &build_hash_partitions_));
+
+ auto it = spilled_partitions_.find(build_input_partition->id());
+ DCHECK(it != spilled_partitions_.end())
+ << "All spilled build partitions must have a corresponding probe partition";
+ input_partition_ = std::move(it->second);
+ spilled_partitions_.erase(it);
+ DCHECK_EQ(build_input_partition, input_partition_->build_partition());
DCHECK_EQ(input_partition_->probe_rows()->BytesPinned(false), 0) << NodeDebugString();
- bool empty_probe = input_partition_->probe_rows()->num_rows() == 0;
- bool repartitioned;
- int level;
- RETURN_IF_ERROR(builder_->BeginSpilledProbe(empty_probe, build_partition,
- buffer_pool_client(), &repartitioned, &level, &build_hash_partitions_));
-
- ht_ctx_->set_level(level);
- if (empty_probe) {
+ ht_ctx_->set_level(build_input_partition->level() + (repartitioned ? 1 : 0));
+ if (!repartitioned && build_input_partition->hash_tbl() == nullptr) {
+ // Build skipped the hash table build, which can only happen if there are no probe
+ // rows.
+ DCHECK_EQ(0, input_partition_->probe_rows()->num_rows())
+ << build_input_partition->DebugString() << endl
+ << input_partition_->probe_rows()->DebugString();
return Status::OK();
} else if (repartitioned) {
RETURN_IF_ERROR(PrepareForPartitionedProbe());
@@ -951,7 +954,8 @@
// Validate the state of the partitions.
for (int i = 0; i < PARTITION_FANOUT; ++i) {
- PhjBuilder::Partition* build_partition = (*build_hash_partitions_.hash_partitions)[i];
+ PhjBuilder::Partition* build_partition =
+ (*build_hash_partitions_.hash_partitions)[i].get();
ProbePartition* probe_partition = probe_hash_partitions_[i].get();
if (build_partition->IsClosed()) {
DCHECK(hash_tbls_[i] == NULL);
@@ -973,7 +977,8 @@
*have_spilled_hash_partitions = false;
probe_hash_partitions_.resize(PARTITION_FANOUT);
for (int i = 0; i < PARTITION_FANOUT; ++i) {
- PhjBuilder::Partition* build_partition = (*build_hash_partitions_.hash_partitions)[i];
+ PhjBuilder::Partition* build_partition =
+ (*build_hash_partitions_.hash_partitions)[i].get();
if (build_partition->IsClosed() || !build_partition->is_spilled()) continue;
*have_spilled_hash_partitions = true;
DCHECK(probe_hash_partitions_[i] == nullptr);
@@ -1080,43 +1085,37 @@
// Need to clean up single in-memory build partition instead of hash partitions.
DCHECK(build_hash_partitions_.hash_partitions == nullptr);
DCHECK(input_partition_ != nullptr);
- builder_->DoneProbingSinglePartition(input_partition_->build_partition(),
+ builder_->DoneProbingSinglePartition(
&output_build_partitions_, IsLeftSemiJoin(join_op_) ? nullptr : batch);
} else {
- // Walk the partitions that had hash tables built for the probe phase and close them.
- // In the case of right outer and full outer joins, instead of closing those
- // partitions, add them to the list of partitions that need to output any unmatched
- // build rows. This partition will be closed by the function that actually outputs
- // unmatched build rows.
+ // Walk the partitions that had hash tables built for the probe phase and either
+ // close them or move them to 'spilled_partitions_'.
DCHECK_EQ(build_hash_partitions_.hash_partitions->size(), PARTITION_FANOUT);
DCHECK_EQ(probe_hash_partitions_.size(), PARTITION_FANOUT);
- // The build partitions we need to retain for further processing.
- bool retain_spilled_partition[PARTITION_FANOUT] = {false};
+ int64_t num_spilled_probe_rows[PARTITION_FANOUT] = {0};
for (int i = 0; i < PARTITION_FANOUT; ++i) {
ProbePartition* probe_partition = probe_hash_partitions_[i].get();
+ PhjBuilder::Partition* build_partition =
+ (*build_hash_partitions_.hash_partitions)[i].get();
if (probe_partition == nullptr) {
// Partition was not spilled.
if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
// For NAAJ, we need to try to match the NULL probe rows with this build
// partition before we are done with it.
- PhjBuilder::Partition* build_partition =
- (*build_hash_partitions_.hash_partitions)[i];
if (!build_partition->IsClosed()) {
RETURN_IF_ERROR(EvaluateNullProbe(state, build_partition->build_rows()));
}
}
} else if (probe_partition->probe_rows()->num_rows() != 0
|| NeedToProcessUnmatchedBuildRows(join_op_)) {
- retain_spilled_partition[i] = true;
+ num_spilled_probe_rows[i] = probe_partition->probe_rows()->num_rows();
// Unpin the probe stream to free up more memory. We need to free all memory so we
// can recurse the algorithm and create new hash partitions from spilled
// partitions.
RETURN_IF_ERROR(
probe_partition->probe_rows()->UnpinStream(BufferedTupleStream::UNPIN_ALL));
- // Push newly created partitions at the front. This means a depth first walk
- // (more finely partitioned partitions are processed first). This allows us
- // to delete blocks earlier and bottom out the recursion earlier.
- spilled_partitions_.push_front(std::move(probe_hash_partitions_[i]));
+ spilled_partitions_.emplace(
+ build_partition->id(), std::move(probe_hash_partitions_[i]));
} else {
// There's no more processing to do for this partition, and since there were no
// probe rows we didn't return any rows that reference memory from these
@@ -1126,7 +1125,7 @@
}
probe_hash_partitions_.clear();
build_hash_partitions_.Reset();
- builder_->DoneProbingHashPartitions(retain_spilled_partition,
+ builder_->DoneProbingHashPartitions(num_spilled_probe_rows,
&output_build_partitions_, IsLeftSemiJoin(join_op_) ? nullptr : batch);
}
if (input_partition_ != nullptr) {
@@ -1135,7 +1134,7 @@
}
if (!output_build_partitions_.empty()) {
DCHECK(output_unmatched_batch_iter_.get() == nullptr);
- PhjBuilder::Partition* output_partition = output_build_partitions_.front();
+ PhjBuilder::Partition* output_partition = output_build_partitions_.front().get();
if (output_partition->hash_tbl() != nullptr) {
hash_tbl_iterator_ = output_partition->hash_tbl()->FirstUnmatched(ht_ctx_.get());
} else {
@@ -1184,13 +1183,14 @@
if (!spilled_partitions_.empty()) {
ss << "SpilledPartitions" << endl;
- for (const unique_ptr<ProbePartition>& probe_partition : spilled_partitions_) {
+ for (const auto& entry : spilled_partitions_) {
+ ProbePartition* probe_partition = entry.second.get();
PhjBuilder::Partition* build_partition = probe_partition->build_partition();
DCHECK(build_partition->is_spilled());
DCHECK(build_partition->hash_tbl() == NULL);
DCHECK(build_partition->build_rows() != NULL);
DCHECK(probe_partition->probe_rows() != NULL);
- ss << " Partition=" << probe_partition.get() << endl
+ ss << " ProbePartition (id=" << entry.first << "):" << probe_partition << endl
<< " Spilled Build Rows: " << build_partition->build_rows()->num_rows() << endl
<< " Spilled Probe Rows: " << probe_partition->probe_rows()->num_rows()
<< endl;
diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h
index ba8f141..5ff8ab2 100644
--- a/be/src/exec/partitioned-hash-join-node.h
+++ b/be/src/exec/partitioned-hash-join-node.h
@@ -557,11 +557,11 @@
/// have somewhere to spill the probe rows for the spilled partition).
std::vector<std::unique_ptr<ProbePartition>> probe_hash_partitions_;
- /// The list of probe partitions that have been spilled and still need more
- /// processing. These partitions could need repartitioning, in which case more
- /// partitions will be added to this list after repartitioning.
+ /// Probe partitions that have been spilled and still need more processing. Each of
+ /// these has a corresponding build partition in 'builder_' with the same PartitionId.
/// This list is populated at DoneProbing().
- std::list<std::unique_ptr<ProbePartition>> spilled_partitions_;
+ std::unordered_map<PhjBuilder::PartitionId, std::unique_ptr<ProbePartition>>
+ spilled_partitions_;
/// The current spilled probe partition being processed as input to repartitioning,
/// or the source of the probe rows if the hash table fits in memory.
@@ -570,7 +570,7 @@
/// In the case of right-outer and full-outer joins, this is the list of the partitions
/// for which we need to output their unmatched build rows. This list is populated at
/// DoneProbing(). If this is non-empty, probe_state_ must be OUTPUTTING_UNMATCHED.
- std::list<PhjBuilder::Partition*> output_build_partitions_;
+ std::deque<std::unique_ptr<PhjBuilder::Partition>> output_build_partitions_;
/// Partition used if 'null_aware_' is set. During probing, rows from the probe
/// side that did not have a match in the hash table are appended to this partition.