blob: 7e9fdf730bf5a5a1bc845b945781b300d10d4f19 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H
#define IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H
#include <boost/scoped_ptr.hpp>
#include <memory>
#include <list>
#include <vector>
#include "common/object-pool.h"
#include "common/status.h"
#include "exec/data-sink.h"
#include "exec/filter-context.h"
#include "exec/hash-table.h"
#include "exec/join-op.h"
#include "runtime/buffered-tuple-stream.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/bufferpool/suballocator.h"
namespace impala {
class RowDescriptor;
class RuntimeState;
class ScalarExpr;
class ScalarExprEvaluator;
/// The build side for the PartitionedHashJoinNode. Build-side rows are hash-partitioned
/// into PARTITION_FANOUT partitions, with partitions spilled if the full build side
/// does not fit in memory. Spilled partitions can be repartitioned with a different
/// hash function per level of repartitioning.
///
/// The builder owns the hash tables and build row streams. The builder first does the
/// level 0 partitioning of build rows. After FlushFinal() the builder has produced some
/// in-memory partitions and some spilled partitions. The in-memory partitions have hash
/// tables and the spilled partitions have a probe-side stream prepared with one write
/// buffer, which is sufficient to spill the partition's probe rows to disk without
/// allocating additional buffers.
///
/// After this initial partitioning, the join node probes the in-memory hash partitions.
/// The join node then drives processing of any spilled partitions, calling
/// Partition::BuildHashTable() to build hash tables for a spilled partition or calling
/// RepartitionBuildInput() to repartition a level n partition into multiple level n + 1
/// partitions.
///
/// Both the PartitionedHashJoinNode and the builder share a BufferPool client
/// and the corresponding reservations. Different stages of the spilling algorithm
/// require different mixes of build and probe buffers and hash tables, so we can
/// share the reservation to minimize the combined memory requirement. Initial probe-side
/// buffers are allocated in the builder then handed off to the probe side to implement
/// this reservation sharing.
///
/// TODO: after we have reliable reservations (IMPALA-3200), we can simplify the handoff
/// to the probe side by using reservations instead of preparing the streams.
///
/// The full hash join algorithm is documented in PartitionedHashJoinNode.
class PhjBuilder : public DataSink {
public:
/// Number of initial partitions to create. Must be a power of two.
static const int PARTITION_FANOUT = 16;
/// Needs to be log2(PARTITION_FANOUT).
static const int NUM_PARTITIONING_BITS = 4;
/// Maximum number of times we will repartition. The maximum build table we
/// can process is:
/// MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). With a (low) 1GB
/// limit and 64 fanout, we can support 256TB build tables in the case where
/// there is no skew.
/// In the case where there is skew, repartitioning is unlikely to help (assuming a
/// reasonable hash function).
/// Note that we need to have at least as many SEED_PRIMES in HashTableCtx.
/// TODO: we can revisit and try harder to explicitly detect skew.
static const int MAX_PARTITION_DEPTH = 16;
class Partition;
PhjBuilder(int join_node_id, const std::string& join_node_label, TJoinOp::type join_op,
const RowDescriptor* probe_row_desc, const RowDescriptor* build_row_desc,
RuntimeState* state, BufferPool::ClientHandle* buffer_pool_client,
int64_t spillable_buffer_size, int64_t max_row_buffer_size);
Status InitExprsAndFilters(RuntimeState* state,
const std::vector<TEqJoinCondition>& eq_join_conjuncts,
const std::vector<TRuntimeFilterDesc>& filters) WARN_UNUSED_RESULT;
/// Implementations of DataSink interface methods.
virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
virtual Status Open(RuntimeState* state) override;
virtual Status Send(RuntimeState* state, RowBatch* batch) override;
virtual Status FlushFinal(RuntimeState* state) override;
virtual void Close(RuntimeState* state) override;
/// Does all codegen for the builder (if codegen is enabled).
/// Updates the the builder's runtime profile with info about whether any errors
/// occured during codegen.
virtual void Codegen(LlvmCodeGen* codegen) override;
/////////////////////////////////////////
// The following functions are used only by PartitionedHashJoinNode.
/////////////////////////////////////////
/// Reset the builder to the same state as it was in after calling Open().
void Reset(RowBatch* row_batch);
/// Transfer ownership of the probe streams to the caller. One stream was allocated per
/// spilled partition in FlushFinal(). The probe streams are empty but prepared for
/// writing with a write buffer allocated.
std::vector<std::unique_ptr<BufferedTupleStream>> TransferProbeStreams();
/// Called after probing of the partitions is done. Appends in-memory partitions that
/// may contain build rows to output to 'output_partitions' for build modes like
/// right outer join that output unmatched rows. Close other in-memory partitions,
/// attaching any tuple data to 'batch' if 'batch' is non-NULL. Closes spilled
/// partitions if 'retain_spilled_partition' is false for that partition index.
/// Invalid to call hash_partition() after this is called.
void DoneProbing(const bool retain_spilled_partition[PARTITION_FANOUT],
std::list<Partition*>* output_partitions, RowBatch* batch);
/// Close the null aware partition (if there is one) and set it to NULL.
void CloseNullAwarePartition() {
if (null_aware_partition_ != nullptr) {
// We don't need to pass in a batch because the anti-join only returns tuple data
// from the probe side - i.e. the RowDescriptor for PartitionedHashJoinNode does
// not include the build tuple.
null_aware_partition_->Close(nullptr);
null_aware_partition_ = nullptr;
}
}
/// Creates new hash partitions and repartitions 'input_partition'. The previous
/// hash partitions must have been cleared with ClearHashPartitions().
/// 'level' is the level new partitions should be created with. This functions prepares
/// 'input_probe_rows' for reading in "delete_on_read" mode, so that the probe phase
/// has enough buffers preallocated to execute successfully.
Status RepartitionBuildInput(Partition* input_partition, int level,
BufferedTupleStream* input_probe_rows) WARN_UNUSED_RESULT;
/// Returns the largest build row count out of the current hash partitions.
int64_t LargestPartitionRows() const;
/// True if the hash table may contain rows with one or more NULL join keys. This
/// depends on the join type and the equijoin conjuncts.
bool HashTableStoresNulls() const;
void AddHashTableStatsToProfile(RuntimeProfile* profile);
/// Accessor functions, mainly required to expose state to PartitionedHashJoinNode.
inline bool non_empty_build() const { return non_empty_build_; }
inline const std::vector<bool>& is_not_distinct_from() const {
return is_not_distinct_from_;
}
inline int num_hash_partitions() const { return hash_partitions_.size(); }
inline Partition* hash_partition(int partition_idx) const {
DCHECK_GE(partition_idx, 0);
DCHECK_LT(partition_idx, hash_partitions_.size());
return hash_partitions_[partition_idx];
}
inline Partition* null_aware_partition() const { return null_aware_partition_; }
std::string DebugString() const;
/// A partition containing a subset of build rows.
///
/// A partition may contain two data structures: the build rows and optionally a hash
/// table built over the build rows. Building the hash table requires all build rows to
/// be pinned in memory. The build rows are kept in memory if all partitions fit in
/// memory, but can be unpinned and spilled to disk to free up memory. Reading or
/// writing the unpinned rows requires a single read or write buffer. If the unpinned
/// rows are not being read or written, they can be completely unpinned, requiring no
/// buffers.
///
/// The build input is first partitioned by hash function level 0 into the level 0
/// partitions. Then, if the join spills and the size of a level n partition is too
/// large to fit in memory, the partition's rows can be repartitioned with the level
/// n + 1 hash function into the level n + 1 partitions.
class Partition {
public:
Partition(RuntimeState* state, PhjBuilder* parent, int level);
~Partition();
/// Close the partition and attach resources to 'batch' if non-NULL or free the
/// resources if 'batch' is NULL. Idempotent.
void Close(RowBatch* batch);
/// Returns the estimated byte size of the in-memory data structures for this
/// partition. This includes all build rows and the hash table.
int64_t EstimatedInMemSize() const;
/// Pins the build tuples for this partition and constructs the hash table from it.
/// Build rows cannot be added after calling this. If the build rows could not be
/// pinned or the hash table could not be built due to memory pressure, sets *built
/// to false and returns OK. Returns an error status if any other error is
/// encountered.
Status BuildHashTable(bool* built) WARN_UNUSED_RESULT;
/// Spills this partition, the partition's stream is unpinned with 'mode' and
/// its hash table is destroyed if it was built. Calling with 'mode' UNPIN_ALL
/// unpins all pages and frees all buffers associated with the partition so that
/// the partition does not use any reservation. Calling with 'mode'
/// UNPIN_ALL_EXCEPT_CURRENT may leave the read or write pages of the unpinned stream
/// pinned and therefore using reservation. If the partition was previously
/// spilled with mode UNPIN_ALL_EXCEPT_CURRENT, then calling Spill() again with
/// UNPIN_ALL may release more reservation by unpinning the read or write page
/// in the stream.
Status Spill(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT;
std::string DebugString();
bool ALWAYS_INLINE IsClosed() const { return build_rows_ == NULL; }
BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); }
HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); }
bool ALWAYS_INLINE is_spilled() const { return is_spilled_; }
int ALWAYS_INLINE level() const { return level_; }
/// Return true if the partition can be spilled - is not closed and is not spilled.
bool CanSpill() const { return !IsClosed() && !is_spilled(); }
private:
/// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'flat_rows' is an array
/// containing the rows in the hash table's tuple stream.
/// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash
/// table buckets which the rows hashes to will be prefetched. This parameter is
/// replaced with a constant during codegen time. This function may be replaced with
/// a codegen'd version. Returns true if all rows in 'batch' are successfully
/// inserted and false otherwise. If inserting failed, 'status' indicates why it
/// failed: if 'status' is ok, inserting failed because not enough reservation
/// was available and if 'status' is an error, inserting failed because of that error.
bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx,
RowBatch* batch, const std::vector<BufferedTupleStream::FlatRowPtr>& flat_rows,
Status* status);
const PhjBuilder* parent_;
/// True if this partition is spilled.
bool is_spilled_;
/// How many times rows in this partition have been repartitioned. Partitions created
/// from the node's children's input is level 0, 1 after the first repartitioning,
/// etc.
const int level_;
/// The hash table for this partition.
boost::scoped_ptr<HashTable> hash_tbl_;
/// Stream of build tuples in this partition. Initially owned by this object but
/// transferred to the parent exec node (via the row batch) when the partition
/// is closed. If NULL, ownership has been transferred and the partition is closed.
std::unique_ptr<BufferedTupleStream> build_rows_;
};
/// Computes the minimum reservation required to execute the spilling partitioned
/// hash algorithm successfully for any input size (assuming enough disk space is
/// available for spilled rows). The buffers are used for buffering both build and
/// probe rows at different times, so the total requirement is the peak sum of build
/// and probe buffers required.
/// We need one output buffer per partition to partition the build or probe side. We
/// need one additional buffer for the input while repartitioning the build or probe.
/// For NAAJ, we need 3 additional buffers for 'null_aware_partition_',
/// 'null_aware_probe_partition_' and 'null_probe_rows_'.
int64_t MinReservation() const {
// Must be kept in sync with HashJoinNode.computeNodeResourceProfile() in fe.
int num_reserved_buffers = PARTITION_FANOUT + 1;
if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) num_reserved_buffers += 3;
// Two of the buffers must fit the maximum row.
return spillable_buffer_size_ * (num_reserved_buffers - 2) + max_row_buffer_size_ * 2;
}
/// Class name in LLVM IR.
static const char* LLVM_CLASS_NAME;
private:
/// Create and initialize a set of hash partitions for partitioning level 'level'.
/// The previous hash partitions must have been cleared with ClearHashPartitions().
/// After calling this, batches are added to the new partitions by calling Send().
Status CreateHashPartitions(int level) WARN_UNUSED_RESULT;
/// Create a new partition in 'all_partitions_' and prepare it for writing.
Status CreateAndPreparePartition(int level, Partition** partition) WARN_UNUSED_RESULT;
/// Reads the rows in build_batch and partitions them into hash_partitions_. If
/// 'build_filters' is true, runtime filters are populated. 'is_null_aware' is
/// set to true if the join type is a null aware join.
Status ProcessBuildBatch(
RowBatch* build_batch, HashTableCtx* ctx, bool build_filters,
bool is_null_aware) WARN_UNUSED_RESULT;
/// Append 'row' to 'stream'. In the common case, appending the row to the stream
/// immediately succeeds. Otherwise this function falls back to the slower path of
/// AppendRowStreamFull(), which may spill partitions to free memory. Returns false
/// and sets 'status' if it was unable to append the row, even after spilling
/// partitions. This odd return convention is used to avoid emitting unnecessary code
/// for ~Status in perf-critical code.
bool AppendRow(
BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT;
/// Slow path for AppendRow() above. It is called when the stream has failed to append
/// the row. We need to find more memory by either switching to IO-buffers, in case the
/// stream still uses small buffers, or spilling a partition. Returns false and sets
/// 'status' if it was unable to append the row, even after spilling partitions.
bool AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row,
Status* status) noexcept WARN_UNUSED_RESULT;
/// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed
/// to the Spill() call for the selected partition. The current policy is to spill the
/// null-aware partition first (if a NAAJ), then the largest partition. Returns non-ok
/// status if we couldn't spill a partition. If 'spilled_partition' is non-NULL, set
/// to the partition that was the one spilled.
Status SpillPartition(BufferedTupleStream::UnpinMode mode,
Partition** spilled_partition = nullptr) WARN_UNUSED_RESULT;
/// Tries to build hash tables for all unspilled hash partitions. Called after
/// FlushFinal() when all build rows have been partitioned and added to the appropriate
/// streams. If the hash table could not be built for a partition, the partition is
/// spilled (with all build blocks unpinned) and the probe stream is prepared for
/// writing (i.e. has an initial probe buffer allocated).
///
/// When this function returns successfully, each partition is in one of these states:
/// 1. closed. No probe partition is created and the build partition is closed.
/// 2. in-memory. The build rows are pinned and has a hash table built. No probe
/// partition is created.
/// 3. spilled. The build rows are fully unpinned and the probe stream is prepared.
Status BuildHashTablesAndPrepareProbeStreams() WARN_UNUSED_RESULT;
/// Ensures that 'spilled_partition_probe_streams_' has a stream per spilled partition
/// in 'hash_partitions_'. May spill additional partitions until it can create enough
/// probe streams with write buffers. Returns an error if an error is encountered or
/// if it runs out of partitions to spill.
Status InitSpilledPartitionProbeStreams() WARN_UNUSED_RESULT;
/// Calls Close() on every Partition, deletes them, and cleans up any pointers that
/// may reference them. Also cleans up 'spilled_partition_probe_streams_'. If
/// 'row_batch' if not NULL, transfers the ownership of all row-backing resources to it.
void CloseAndDeletePartitions(RowBatch* row_batch);
/// For each filter in filters_, allocate a bloom_filter from the fragment-local
/// RuntimeFilterBank and store it in runtime_filters_ to populate during the build
/// phase.
void AllocateRuntimeFilters();
/// Iterates over the runtime filters in filters_ and inserts each row into each filter.
/// This is replaced at runtime with code generated by CodegenInsertRuntimeFilters().
void InsertRuntimeFilters(TupleRow* build_row) noexcept;
/// Publish the runtime filters to the fragment-local RuntimeFilterBank.
/// 'num_build_rows' is used to determine whether the computed filters have an
/// unacceptably high false-positive rate.
void PublishRuntimeFilters(int64_t num_build_rows);
/// Codegen processing build batches. Identical signature to ProcessBuildBatch().
/// Returns non-OK status if codegen was not possible.
Status CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn,
llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn,
llvm::Function* insert_filters_fn) WARN_UNUSED_RESULT;
/// Codegen inserting batches into a partition's hash table. Identical signature to
/// Partition::InsertBatch(). Returns non-OK if codegen was not possible.
Status CodegenInsertBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn,
llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn,
TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
/// Codegen inserting rows into runtime filters. Identical signature to
/// InsertRuntimeFilters(). Returns non-OK if codegen was not possible.
Status CodegenInsertRuntimeFilters(
LlvmCodeGen* codegen, const vector<ScalarExpr*>& filter_exprs, llvm::Function** fn);
RuntimeState* const runtime_state_;
/// The ID of the plan join node this is associated with.
/// TODO: we may want to replace this with a sink ID once we progress further with
/// multithreading.
const int join_node_id_;
/// The label of the plan join node this is associated with.
const std::string join_node_label_;
/// The join operation this is building for.
const TJoinOp::type join_op_;
/// Descriptor for the probe rows, needed to initialize probe streams.
const RowDescriptor* probe_row_desc_;
/// Pool for objects with same lifetime as builder.
ObjectPool obj_pool_;
/// Client to the buffer pool, used to allocate build partition buffers and hash tables.
/// When probing, the spilling algorithm keeps some build partitions in memory while
/// using memory for probe buffers for spilled partitions. To support dynamically
/// dividing memory between build and probe, this client is shared between the builder
/// and the PartitionedHashJoinNode.
/// TODO: this approach to sharing will not work for spilling broadcast joins with a
/// 1:N relationship from builders to join nodes.
BufferPool::ClientHandle* buffer_pool_client_;
/// The default and max buffer sizes to use in the build and probe streams.
const int64_t spillable_buffer_size_;
const int64_t max_row_buffer_size_;
/// Allocator for hash table memory.
boost::scoped_ptr<Suballocator> ht_allocator_;
/// If true, the build side has at least one row.
bool non_empty_build_;
/// Expressions over input rows for hash table build.
std::vector<ScalarExpr*> build_exprs_;
/// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS
/// NOT DISTINCT FROM, rather than equality.
std::vector<bool> is_not_distinct_from_;
/// Expressions for evaluating input rows for insertion into runtime filters.
std::vector<ScalarExpr*> filter_exprs_;
/// List of filters to build. One-to-one correspondence with exprs in 'filter_exprs_'.
std::vector<FilterContext> filter_ctxs_;
/// Used for hash-related functionality, such as evaluating rows and calculating hashes.
/// The level is set to the same level as 'hash_partitions_'.
boost::scoped_ptr<HashTableCtx> ht_ctx_;
/// Counters and profile objects for HashTable stats
std::unique_ptr<HashTableStatsProfile> ht_stats_profile_;
/// Total number of partitions created.
RuntimeProfile::Counter* partitions_created_;
/// The largest fraction (of build side) after repartitioning. This is expected to be
/// 1 / PARTITION_FANOUT. A value much larger indicates skew.
RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_;
/// Level of max partition (i.e. number of repartitioning steps).
RuntimeProfile::HighWaterMarkCounter* max_partition_level_;
/// Number of build rows that have been partitioned.
RuntimeProfile::Counter* num_build_rows_partitioned_;
/// Number of partitions that have been spilled.
RuntimeProfile::Counter* num_spilled_partitions_;
/// Number of partitions that have been repartitioned.
RuntimeProfile::Counter* num_repartitions_;
/// Time spent partitioning build rows.
RuntimeProfile::Counter* partition_build_rows_timer_;
/// Time spent building hash tables.
RuntimeProfile::Counter* build_hash_table_timer_;
/// Time spent repartitioning and building hash tables of any resulting partitions
/// that were not spilled.
RuntimeProfile::Counter* repartition_timer_;
/////////////////////////////////////////
/// BEGIN: Members that must be Reset()
/// Vector that owns all of the Partition objects.
std::vector<std::unique_ptr<Partition>> all_partitions_;
/// The current set of partitions that are being built or probed. This vector is
/// initialized before partitioning or re-partitioning the build input
/// and cleared after we've finished probing the partitions.
/// This is not used when processing a single spilled partition.
std::vector<Partition*> hash_partitions_;
/// Partition used for null-aware joins. This partition is always processed at the end
/// after all build and probe rows are processed. In this partition's 'build_rows_', we
/// store all the rows for which 'build_expr_evals_' evaluated over the row returns
/// NULL (i.e. it has a NULL on the eq join slot).
/// NULL if the join is not null aware or we are done processing this partition.
/// Always NULL once we are done processing the level 0 partitions.
/// This partition starts off in memory but can be spilled.
Partition* null_aware_partition_;
/// Populated during the hash table building phase if any partitions spilled.
/// One probe stream per spilled partition is prepared for writing so that the
/// initial write buffer is allocated.
///
/// These streams are handed off to PartitionedHashJoinNode for use in buffering
/// spilled probe rows. The allocation is done in the builder so that it can divide
/// memory between the in-memory build partitions and write buffers based on the size
/// of the partitions and available memory. E.g. if all the partitions fit in memory, no
/// write buffers need to be allocated, but if some partitions are spilled, more build
/// partitions may be spilled to free up memory for write buffers.
///
/// Because of this, at the end of the build phase, we always have sufficient memory
/// to execute the probe phase of the algorithm without spilling more partitions.
std::vector<std::unique_ptr<BufferedTupleStream>> spilled_partition_probe_streams_;
/// END: Members that must be Reset()
/////////////////////////////////////////
/// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available
/// and is used when the partition level is 0, otherwise xxx_fn_ uses murmur hash and is
/// used for subsequent levels.
typedef Status (*ProcessBuildBatchFn)(
PhjBuilder*, RowBatch*, HashTableCtx*, bool build_filters, bool is_null_aware);
/// Jitted ProcessBuildBatch function pointers. NULL if codegen is disabled.
ProcessBuildBatchFn process_build_batch_fn_;
ProcessBuildBatchFn process_build_batch_fn_level0_;
typedef bool (*InsertBatchFn)(Partition*, TPrefetchMode::type, HashTableCtx*, RowBatch*,
const std::vector<BufferedTupleStream::FlatRowPtr>&, Status*);
/// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled.
InsertBatchFn insert_batch_fn_;
InsertBatchFn insert_batch_fn_level0_;
};
}
#endif