| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #ifndef IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H |
| #define IMPALA_EXEC_PARTITIONED_HASH_JOIN_BUILDER_H |
| |
| #include <deque> |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| #include <boost/scoped_ptr.hpp> |
| |
| #include "common/atomic.h" |
| #include "codegen/codegen-fn-ptr.h" |
| #include "common/object-pool.h" |
| #include "common/status.h" |
| #include "exec/filter-context.h" |
| #include "exec/hash-table.h" |
| #include "exec/join-builder.h" |
| #include "exec/join-op.h" |
| #include "runtime/buffered-tuple-stream.h" |
| #include "runtime/bufferpool/buffer-pool.h" |
| #include "runtime/bufferpool/suballocator.h" |
| #include "runtime/reservation-manager.h" |
| |
| namespace impala { |
| |
| class CyclicBarrier; |
| class PhjBuilder; |
| class PhjBuilderPartition; |
| class RowDescriptor; |
| class RuntimeState; |
| class ScalarExpr; |
| class ScalarExprEvaluator; |
| |
| /// Method signature of the codegened version of InsertBatch(). |
| typedef bool (*InsertBatchFn)(PhjBuilderPartition*, TPrefetchMode::type, HashTableCtx*, |
| RowBatch*, const std::vector<BufferedTupleStream::FlatRowPtr>&, Status*); |
| |
| /// Partitioned Hash Join Builder Config class. This has a few extra methods to be used |
| /// directly by the PartitionedHashJoinPlanNode. Since it is expected to only be created |
| /// and used by PartitionedHashJoinPlanNode only, the DataSinkConfig::Init() and |
| /// DataSinkConfig::CreateSink() are not implemented for it. |
| class PhjBuilderConfig : public JoinBuilderConfig { |
| public: |
| DataSink* CreateSink(RuntimeState* state) const override; |
| |
| /// Creates a PhjBuilder for embedded use within a PartitionedHashJoinNode. |
| PhjBuilder* CreateSink(BufferPool::ClientHandle* buffer_pool_client, |
| int64_t spillable_buffer_size, int64_t max_row_buffer_size, |
| RuntimeState* state) const; |
| |
| /// Creates a PhjBuilderConfig for embedded use within a PartitionedHashJoinNode. |
| /// Creates the object in the state's object pool. To be used only by |
| /// PartitionedHashJoinPlanNode. |
| static Status CreateConfig(FragmentState* state, int join_node_id, |
| TJoinOp::type join_op, const RowDescriptor* build_row_desc, |
| const std::vector<TEqJoinCondition>& eq_join_conjuncts, |
| const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed, |
| PhjBuilderConfig** sink); |
| |
| void Close() override; |
| void Codegen(FragmentState* state) override; |
| |
| ~PhjBuilderConfig() override {} |
| |
| /// Expressions over input rows for hash table build. |
| std::vector<ScalarExpr*> build_exprs_; |
| |
| /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS |
| /// NOT DISTINCT FROM, rather than equality. This is the case when IS NOT DISTINCT FROM |
| /// is explicitly used as a join predicate or when joining Iceberg equality delete |
| /// files to data files. |
| /// Set in InitExprsAndFilters() and constant thereafter. |
| std::vector<bool> is_not_distinct_from_; |
| |
| /// Expressions for evaluating input rows for insertion into runtime filters. |
| /// Only includes exprs for filters produced by this builder. |
| std::vector<ScalarExpr*> filter_exprs_; |
| |
| /// The runtime filter descriptors of filters produced by this builder. |
| vector<TRuntimeFilterDesc> filter_descs_; |
| |
| /// Seed used for hashing rows. Must match seed used in the PartitionedHashJoinNode. |
| uint32_t hash_seed_; |
| |
| /// Resource information sent from the frontend. Non-null if this is a separate join |
| /// build. |
| const TBackendResourceProfile* resource_profile_ = nullptr; |
| |
| /// Used for codegening hash table specific methods and to create the corresponding |
| /// instance of HashTableCtx. |
| const HashTableConfig* hash_table_config_; |
| |
| /// For the below codegen'd functions, xxx_fn_level0_ uses CRC hashing when available |
| /// and is used when the partition level is 0, otherwise xxx_fn_ uses murmur hash and is |
| /// used for subsequent levels. |
| typedef Status (*ProcessBuildBatchFn)( |
| PhjBuilder*, RowBatch*, HashTableCtx*, bool build_filters, bool is_null_aware); |
| /// Jitted ProcessBuildBatch function pointers. NULL if codegen is disabled. |
| CodegenFnPtr<ProcessBuildBatchFn> process_build_batch_fn_; |
| CodegenFnPtr<ProcessBuildBatchFn> process_build_batch_fn_level0_; |
| |
| /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled. |
| |
| /// Method signature of the codegened version of Partition::InsertBatch(). |
| CodegenFnPtr<InsertBatchFn> insert_batch_fn_; |
| CodegenFnPtr<InsertBatchFn> insert_batch_fn_level0_; |
| |
| protected: |
| /// Initialization for separate sink. |
| Status Init(const TDataSink& tsink, const RowDescriptor* input_row_desc, |
| FragmentState* state) override; |
| |
| private: |
| /// Helper method used by CreateConfig() to initialize embedded builder. |
| /// 'tsink' does not need to be initialized by the caller - all values to be used are |
| /// passed in as arguments and this function fills in required fields in 'tsink'. |
| Status Init(FragmentState* state, int join_node_id, TJoinOp::type join_op, |
| const RowDescriptor* build_row_desc, |
| const std::vector<TEqJoinCondition>& eq_join_conjuncts, |
| const std::vector<TRuntimeFilterDesc>& filters, uint32_t hash_seed, |
| TDataSink* tsink); |
| |
| /// Initializes the build and filter expressions, creates a copy of the filter |
| /// descriptors that will be generated by this sink and initializes the hash table |
| /// config object. |
| Status InitExprsAndFilters(FragmentState* state, |
| const std::vector<TEqJoinCondition>& eq_join_conjuncts, |
| const std::vector<TRuntimeFilterDesc>& filters); |
| |
| /// Codegen processing build batches. Identical signature to ProcessBuildBatch(). |
| /// Returns non-OK status if codegen was not possible. |
| Status CodegenProcessBuildBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn, |
| llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn, |
| llvm::Function* insert_filters_fn); |
| |
| /// Codegen inserting batches into a partition's hash table. Identical signature to |
| /// Partition::InsertBatch(). Returns non-OK if codegen was not possible. |
| Status CodegenInsertBatch(LlvmCodeGen* codegen, llvm::Function* hash_fn, |
| llvm::Function* murmur_hash_fn, llvm::Function* eval_row_fn, |
| TPrefetchMode::type prefetch_mode); |
| |
| /// Codegen inserting rows into runtime filters. Identical signature to |
| /// InsertRuntimeFilters(). Returns non-OK if codegen was not possible. |
| Status CodegenInsertRuntimeFilters(LlvmCodeGen* codegen, |
| const std::vector<ScalarExpr*>& filter_exprs, llvm::Function** fn); |
| }; |
| |
| /// See partitioned-hash-join-node.h for explanation of the top-level algorithm and how |
| /// these states fit in it. |
| enum class HashJoinState { |
| /// Partitioning the build (right) child's input into the builder's hash partitions. |
| PARTITIONING_BUILD, |
| |
| /// Processing the probe (left) child's input, probing hash tables and |
| /// spilling probe rows into 'probe_hash_partitions_' if necessary. |
| PARTITIONING_PROBE, |
| |
| /// Processing the spilled probe rows of a single spilled partition |
| /// ('input_partition_') that fits in memory. |
| PROBING_SPILLED_PARTITION, |
| |
| /// Repartitioning the build rows of a single spilled partition ('input_partition_') |
| /// into the builder's hash partitions. |
| /// Corresponds to PARTITIONING_BUILD but reading from a spilled partition. |
| REPARTITIONING_BUILD, |
| |
| /// Probing the repartitioned hash partitions of a single spilled partition |
| /// ('input_partition_') with the probe rows of that partition. |
| /// Corresponds to PARTITIONING_PROBE but reading from a spilled partition. |
| REPARTITIONING_PROBE, |
| }; |
| |
| /// A partition containing a subset of build rows. |
| /// |
| /// A partition may contain two data structures: the build rows and optionally a hash |
| /// table built over the build rows. Building the hash table requires all build rows to |
| /// be pinned in memory. The build rows are kept in memory if all partitions fit in |
| /// memory, but can be unpinned and spilled to disk to free up memory. Reading or |
| /// writing the unpinned rows requires a single read or write buffer. If the unpinned |
| /// rows are not being read or written, they can be completely unpinned, requiring no |
| /// buffers. |
| /// |
| /// The build input is first partitioned by hash function level 0 into the level 0 |
| /// partitions. Then, if the join spills and the size of a level n partition is too |
| /// large to fit in memory, the partition's rows can be repartitioned with the level |
| /// n + 1 hash function into the level n + 1 partitions. |
| class PhjBuilderPartition { |
| public: |
| PhjBuilderPartition(RuntimeState* state, PhjBuilder* parent, int level); |
| ~PhjBuilderPartition(); |
| |
| using PartitionId = int; |
| |
| /// Close the partition and attach resources to 'batch' if non-NULL or free the |
| /// resources if 'batch' is NULL. Idempotent. |
| void Close(RowBatch* batch); |
| |
| /// Returns the estimated byte size of the in-memory data structures for this |
| /// partition. This includes all build rows and the hash table. |
| int64_t EstimatedInMemSize() const; |
| |
| /// Pins the build tuples for this partition and constructs the hash table from it. |
| /// Build rows cannot be added after calling this. If the build rows could not be |
| /// pinned or the hash table could not be built due to memory pressure, sets *built |
| /// to false and returns OK. Returns an error status if any other error is |
| /// encountered. |
| Status BuildHashTable(bool* built) WARN_UNUSED_RESULT; |
| |
| /// Spills this partition, the partition's stream is unpinned with 'mode' and |
| /// its hash table is destroyed if it was built. Calling with 'mode' UNPIN_ALL |
| /// unpins all pages and frees all buffers associated with the partition so that |
| /// the partition does not use any reservation. Calling with 'mode' |
| /// UNPIN_ALL_EXCEPT_CURRENT may leave the read or write pages of the unpinned stream |
| /// pinned and therefore using reservation. If the partition was previously |
| /// spilled with mode UNPIN_ALL_EXCEPT_CURRENT, then calling Spill() again with |
| /// UNPIN_ALL may release more reservation by unpinning the read or write page |
| /// in the stream. |
| Status Spill(BufferedTupleStream::UnpinMode mode) WARN_UNUSED_RESULT; |
| |
| std::string DebugString(); |
| |
| bool ALWAYS_INLINE IsClosed() const { return build_rows_ == nullptr; } |
| BufferedTupleStream* ALWAYS_INLINE build_rows() { return build_rows_.get(); } |
| HashTable* ALWAYS_INLINE hash_tbl() const { return hash_tbl_.get(); } |
| PartitionId id() const { return id_; } |
| bool ALWAYS_INLINE is_spilled() const { return is_spilled_; } |
| int ALWAYS_INLINE level() const { return level_; } |
| /// Return true if the partition can be spilled - is not closed and is not spilled. |
| bool CanSpill() const { return !IsClosed() && !is_spilled(); } |
| int64_t num_spilled_probe_rows() const { return num_spilled_probe_rows_.Load(); } |
| |
| /// Increment the number of spilled probe rows. Thread-safe. |
| void IncrementNumSpilledProbeRows(int64_t count) { |
| num_spilled_probe_rows_.Add(count); |
| } |
| |
| private: |
| /// Inserts each row in 'batch' into 'hash_tbl_' using 'ctx'. 'flat_rows' is an array |
| /// containing the rows in the hash table's tuple stream. |
| /// 'prefetch_mode' is the prefetching mode in use. If it's not PREFETCH_NONE, hash |
| /// table buckets which the rows hashes to will be prefetched. This parameter is |
| /// replaced with a constant during codegen time. This function may be replaced with |
| /// a codegen'd version. Returns true if all rows in 'batch' are successfully |
| /// inserted and false otherwise. If inserting failed, 'status' indicates why it |
| /// failed: if 'status' is ok, inserting failed because not enough reservation |
| /// was available and if 'status' is an error, inserting failed because of that error. |
| bool InsertBatch(TPrefetchMode::type prefetch_mode, HashTableCtx* ctx, |
| RowBatch* batch, const std::vector<BufferedTupleStream::FlatRowPtr>& flat_rows, |
| Status* status); |
| |
| const PhjBuilder* parent_; |
| |
| /// Id for this partition that is unique within the builder. |
| const PartitionId id_; |
| |
| /// True if this partition is spilled. |
| bool is_spilled_; |
| |
| /// How many times rows in this partition have been repartitioned. Partitions created |
| /// from the node's children's input is level 0, 1 after the first repartitioning, |
| /// etc. |
| const int level_; |
| |
| /// The hash table for this partition. |
| boost::scoped_ptr<HashTable> hash_tbl_; |
| |
| /// Stream of build tuples in this partition. Initially owned by this object but |
| /// transferred to the parent exec node (via the row batch) when the partition |
| /// is closed. If NULL, ownership has been transferred and the partition is closed. |
| std::unique_ptr<BufferedTupleStream> build_rows_; |
| |
| /// The number of spilled probe rows associated with this partition. Updated in |
| /// DoneProbingHashPartitions(). |
| AtomicInt64 num_spilled_probe_rows_{0}; |
| }; |
| |
| /// The build side for the PartitionedHashJoinNode. Build-side rows are hash-partitioned |
| /// into PARTITION_FANOUT partitions, with partitions spilled if the full build side |
| /// does not fit in memory. Spilled partitions can be repartitioned with a different |
| /// hash function per level of repartitioning. |
| /// |
| /// The builder owns the hash tables and build row streams. The builder first does the |
| /// level 0 partitioning of build rows. After FlushFinal() the builder has produced some |
| /// in-memory partitions and some spilled partitions. The in-memory partitions have hash |
| /// tables and the spilled partitions have memory reserved for a probe-side stream with |
| /// one write buffer, which is sufficient to spill the partition's probe rows to disk |
| /// without allocating additional buffers. |
| /// |
| /// After this initial partitioning, the join node probes the in-memory hash partitions, |
| /// after which it calls DoneProbingHashPartitions(). Then the spilling algorithm can |
| /// commence, with the join node calling BeginSpilledProbe() and DoneProbing*() methods |
| /// until all spilled partitions are processed. |
| /// |
| /// Both the PartitionedHashJoinNode and the builder share memory reservation. Different |
| /// stages of the spilling algorithm require different mixes of build and probe buffers |
| /// and hash tables, so we can share the reservation to minimize the combined memory |
| /// requirement. Memory for probe-side buffers is reserved in the builder then handed |
| /// off to the probe side to implement this reservation sharing. When the builder is |
| /// integrated into the join node, this is implemented with a shared BufferPool client. |
| /// When the build is separate, reservation is transferred between the builder's and the |
| /// join node's clients as needed. The probe client is passed into various methods as |
| /// 'probe_client'. If the join is integrated, 'probe_client' must be the same client |
| /// as was passed into the constructor. If the join is separate, 'probe_client' must |
| /// be a different client. |
| /// |
| /// The full hash join algorithm is documented in PartitionedHashJoinNode. |
| /// |
| /// Shared Build |
| /// ------------ |
| /// A separate builder can be shared between multiple PartitionedHashJoinNodes. The |
| /// spilling hash join algorithm mutates the state of the builder between phases, so |
| /// requires synchronization between the probe threads executing PartitionedHashJoinNode |
| /// that are reading that state. |
| /// |
| /// The algorithm (specifically the HashJoinState state machine) is executed in lockstep |
| /// across all probe threads with each probe thread working on the same set of partitions |
| /// at the same time. A CyclicBarrier, 'probe_barrier_', is used for synchronization. |
| /// At each state transition where the builder state needs to be mutated, all probe |
| /// threads must arrive at the barrier before proceeding. The state transition is executed |
| /// serially by a single thread before all threads proceed. All probe threads go through |
| /// the same state transitions in lockstep, even if they have no work to do. E.g. if a |
| /// probe thread has zero rows remaining in its spilled partitions, it still needs to |
| /// wait for the other probe threads. |
| /// |
| /// Not all join ops can be used with a shared build. For example, RIGHT_OUTER_JOIN is |
| /// not supported currently, in part because it mutates the hash table during probing to |
| /// track matches, but also because hash table matches would need to be broadcast across |
| /// all instances within the query, not just the backend. |
| class PhjBuilder : public JoinBuilder { |
| public: |
| friend class PhjBuilderPartition; |
| |
| /// Number of initial partitions to create. Must be a power of two. |
| static const int PARTITION_FANOUT = 16; |
| |
| /// Needs to be log2(PARTITION_FANOUT). |
| static const int NUM_PARTITIONING_BITS = 4; |
| |
| /// Maximum number of times we will repartition. The maximum build table we |
| /// can process is: |
| /// MEM_LIMIT * (PARTITION_FANOUT ^ MAX_PARTITION_DEPTH). With a (low) 1GB |
| /// limit and 64 fanout, we can support 256TB build tables in the case where |
| /// there is no skew. |
| /// In the case where there is skew, repartitioning is unlikely to help (assuming a |
| /// reasonable hash function). |
| /// Note that we need to have at least as many SEED_PRIMES in HashTableCtx. |
| /// TODO: we can revisit and try harder to explicitly detect skew. |
| static const int MAX_PARTITION_DEPTH = 16; |
| |
| using PartitionId = int; |
| |
| // Constructor for separate join build. |
| PhjBuilder( |
| TDataSinkId sink_id, const PhjBuilderConfig& sink_config, RuntimeState* state); |
| // Constructor for join builder embedded in a PartitionedHashJoinNode. Shares |
| // 'buffer_pool_client' with the parent node and inherits buffer sizes from |
| // the parent node. |
| PhjBuilder(const PhjBuilderConfig& sink_config, |
| BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size, |
| int64_t max_row_buffer_size, RuntimeState* state); |
| ~PhjBuilder(); |
| |
| /// Implementations of DataSink interface methods. |
| virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override; |
| virtual Status Open(RuntimeState* state) override; |
| virtual Status Send(RuntimeState* state, RowBatch* batch) override; |
| virtual Status FlushFinal(RuntimeState* state) override; |
| virtual void Close(RuntimeState* state) override; |
| |
| ///////////////////////////////////////// |
| // The following functions are used only by PartitionedHashJoinNode. |
| ///////////////////////////////////////// |
| |
| /// Reset the builder to the same state as it was in after calling Open(). |
| /// Not valid to call on a separate join build. |
| void Reset(RowBatch* row_batch); |
| |
| /// Represents a set of hash partitions to be handed off to the probe side. |
| struct HashPartitions { |
| HashPartitions() { Reset(); } |
| HashPartitions(int level, |
| const std::vector<std::unique_ptr<PhjBuilderPartition>>* hash_partitions, |
| bool non_empty_build) |
| : level(level), |
| hash_partitions(hash_partitions), |
| non_empty_build(non_empty_build) {} |
| |
| void Reset() { |
| level = -1; |
| hash_partitions = nullptr; |
| non_empty_build = false; |
| } |
| |
| // The partitioning level of this set of partitions. -1 indicates that this is |
| // invalid. |
| int level; |
| |
| // The current set of hash partitions. Always contains PARTITION_FANOUT partitions. |
| // The partitions may be in-memory, spilled, or closed. Valid until |
| // DoneProbingHashPartitions() is called. |
| const std::vector<std::unique_ptr<PhjBuilderPartition>>* hash_partitions; |
| |
| // True iff the build side had at least one row in a partition. |
| bool non_empty_build; |
| }; |
| |
| /// Get hash partitions and reservation for the initial partitioning of the probe |
| /// side. Only valid to call once per PartitionedHashJoinNode when in state |
| /// PARTITIONING_PROBE (i.e. once if the build is not shared). |
| /// When this function returns successfully, 'probe_client' will have enough |
| /// reservation for a write buffer for each spilled partition. |
| /// Return the current set of hash partitions in 'partitions'. |
| Status BeginInitialProbe( |
| BufferPool::ClientHandle* probe_client, HashPartitions* partitions); |
| |
| /// Pick a spilled partition to process (returned in *input_partition) and |
| /// prepare to probe it. Builds a hash table over *input_partition |
| /// if it fits in memory. Otherwise repartition it into PARTITION_FANOUT |
| /// new partitions. |
| /// |
| /// When this function returns successfully, 'probe_client' will have enough |
| /// reservation for a read buffer for the input probe stream and, if repartitioning, |
| /// a write buffer for each spilled partition. |
| /// |
| /// If repartitioning, creates new hash partitions and repartitions 'partition' into |
| /// PARTITION_FANOUT new partitions with level input_partition->level() + 1. The |
| /// previous hash partitions must have been cleared with DoneProbingHashPartitions(). |
| /// The new hash partitions are returned in 'new_partitions'. |
| /// |
| /// This is a synchronization point for shared join build. The time elapsed during the |
| /// serial execution phase is attributed to the builder. All probe threads must call |
| /// this function before continuing the next phase of the hash join algorithm. |
| Status BeginSpilledProbe(BufferPool::ClientHandle* probe_client, |
| RuntimeProfile* probe_profile, bool* repartitioned, |
| PhjBuilderPartition** input_partition, HashPartitions* new_partitions); |
| |
| /// Called after probing of the hash partitions returned by BeginInitialProbe() or |
| /// BeginSpilledProbe() (when *repartitioned is true) is complete, i.e. all of the |
| /// corresponding probe rows have been processed by PartitionedHashJoinNode. The number |
| /// of spilled probe rows per partition must be passed in via 'num_spilled_probe_rows' |
| /// so that the builder can determine whether a spilled partition needs to be retained. |
| /// Appends in-memory partitions that may contain build rows to output to |
| /// 'output_partitions' for build modes like right outer join that output unmatched |
| /// rows. Close other in-memory partitions, attaching any tuple data to 'batch' if |
| /// 'batch' is non-NULL. Closes spilled partitions if no more processing is needed. |
| /// |
| /// The reservation that was transferred to 'probe_client' in Begin*Probe() is |
| /// transferred back to the builder. |
| /// |
| /// Returns an error if an error was encountered or if the query was cancelled. |
| /// |
| /// This is a synchronization point for shared join build. The time elapsed during the |
| /// serial execution phase is attributed to the builder. All probe threads must call |
| /// this function before continuing the next phase of the hash join algorithm. |
| Status DoneProbingHashPartitions(const int64_t num_spilled_probe_rows[PARTITION_FANOUT], |
| BufferPool::ClientHandle* probe_client, RuntimeProfile* probe_profile, |
| std::deque<std::unique_ptr<PhjBuilderPartition>>* output_partitions, |
| RowBatch* batch); |
| |
| /// Called after probing of a single spilled partition returned by |
| /// BeginSpilledProbe() when *repartitioned is false. |
| /// |
| /// If the join op requires outputting unmatched build rows and the partition |
| /// may have build rows to return, it is appended to 'output_partitions'. Partitions |
| /// returned via 'output_partitions' are ready for the caller to read from - either |
| /// they are in-memory with a hash table built or have build_rows() prepared for |
| /// reading. |
| /// |
| /// If no build rows need to be returned, closes the build partition and attaches any |
| /// tuple data to 'batch' if 'batch' is non-NULL. |
| /// |
| /// The reservation that was transferred to 'probe_client' in Begin*Probe() is |
| /// transferred back to the builder. |
| /// |
| /// Returns an error if an error was encountered or if the query was cancelled. |
| /// |
| /// This is a synchronization point for shared join build. The time elapsed during the |
| /// serial execution phase is attributed to the builder. All probe threads must call |
| /// this function before continuing the next phase of the hash join algorithm. |
| Status DoneProbingSinglePartition(BufferPool::ClientHandle* probe_client, |
| RuntimeProfile* probe_profile, |
| std::deque<std::unique_ptr<PhjBuilderPartition>>* output_partitions, |
| RowBatch* batch); |
| |
| /// Called to begin probing of the null-aware partition, after all other partitions |
| /// have been fully processed. This should only be called if there are build rows in the |
| /// null-aware partition. This pins the null-aware build rows in memory and allows all |
| /// probe threads to access those rows in a read-only manner. |
| /// |
| /// Returns an error if an error was encountered or if the query was cancelled. |
| /// |
| /// This is a synchronization point for shared join build. All probe threads must |
| /// call this function before continuing the next phase of the hash join algorithm. |
| Status BeginNullAwareProbe(); |
| |
| /// Called after probing of the null-aware build partition is complete. |
| /// |
| /// This is a synchronization point for shared join build. All probe threads must |
| /// call this function before continuing the next phase of the hash join algorithm. |
| Status DoneProbingNullAwarePartition(); |
| |
| /// True if the hash table may contain rows with one or more NULL join keys. This |
| /// depends on the join type, passed in via 'join_op' and the 'is_not_distinct_from' |
| /// flags of the equijoin conjuncts, which are passed in via 'is_not_distinct_from'. |
| static bool HashTableStoresNulls( |
| TJoinOp::type join_op, const std::vector<bool>& is_not_distinct_from); |
| |
| /// Returns 'bytes' of reservation to the builder from 'probe_client'. |
| /// Called by the probe side to return surplus reservation. This is usually handled by |
| /// the above methods, but if an error occured during execution, the probe may still |
| /// have some surplus reservation. |
| /// Must only be called if this is a separate build. |
| void ReturnReservation(BufferPool::ClientHandle* probe_client, int64_t bytes); |
| |
| /// Safe to call from PartitionedHashJoinNode threads during the probe phase. |
| HashJoinState state() const { return state_; } |
| |
| /// Accessor to allow PartitionedHashJoinNode to access 'null_aware_partition_'. |
| /// Generally the PartitionedHashJoinNode should only access this partition in |
| /// a read-only manner. |
| inline PhjBuilderPartition* null_aware_partition() const { |
| return null_aware_partition_.get(); |
| } |
| |
| /// Thread-safe. |
| HashTableStatsProfile* ht_stats_profile() const { return ht_stats_profile_.get(); } |
| |
| std::string DebugString() const; |
| |
| /// Unregisters one probe thread from the barrier |
| void UnregisterThreadFromBarrier() const; |
| |
| /// Computes the minimum reservation required to execute the spilling partitioned |
| /// hash algorithm successfully for any input size (assuming enough disk space is |
| /// available for spilled rows). This includes buffers used by the build side, |
| /// the probe side, and buffers that are shared between build and probe. |
| /// We need one output buffer per partition to partition the build or probe side. We |
| /// need one additional buffer for the input while repartitioning the build or probe. |
| /// For NAAJ, we need an additional buffer for 'null_aware_partition_' on the build |
| /// side and two additional buffers for 'null_aware_probe_partition_' and |
| /// 'null_probe_rows_' on the probe side. |
| /// Returns a pair with the probe and build reservation requirements. |
| std::pair<int64_t, int64_t> MinReservation() const { |
| // Must be kept in sync with HashJoinNode.computeNodeResourceProfile() in fe. |
| int num_reserved_build_buffers = PARTITION_FANOUT + 1; |
| int64_t probe_reservation = 0; |
| if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { |
| num_reserved_build_buffers += 1; |
| // One of the NAAJ buffers needs to fit the max row, since we write/read |
| // one stream at a time. If the build is integrated, we already have a max-sized |
| // buffer accounted for in the build reservation. |
| probe_reservation = is_separate_build_ ? |
| max_row_buffer_size_ + spillable_buffer_size_ : spillable_buffer_size_ * 2; |
| } |
| // Two of the build buffers must fit the maximum row for use as read and write |
| // buffers while repartitioning a stream. |
| return {probe_reservation, |
| spillable_buffer_size_ * (num_reserved_build_buffers - 2) |
| + max_row_buffer_size_ * 2}; |
| } |
| |
| /// Class name in LLVM IR. |
| static const char* LLVM_CLASS_NAME; |
| |
| private: |
| /// Updates 'state_' to 'next_state', logging the transition. |
| void UpdateState(HashJoinState next_state); |
| |
| /// Returns the string represenvation of 'state'. |
| static std::string PrintState(HashJoinState state); |
| |
| /// Create and initialize a set of hash partitions for partitioning level 'level'. |
| /// The previous hash partitions must have been cleared with DoneProbing(). |
| /// After calling this, batches are added to the new partitions by calling Send(). |
| Status CreateHashPartitions(int level) WARN_UNUSED_RESULT; |
| |
| /// Create a new partition and prepare it for writing. Returns an error if initializing |
| /// the partition or allocating the write buffer fails. |
| Status CreateAndPreparePartition(int level, |
| std::unique_ptr<PhjBuilderPartition>* partition); |
| |
| /// Reads the rows in build_batch and partitions them into hash_partitions_. If |
| /// 'build_filters' is true, runtime filters are populated. 'is_null_aware' is |
| /// set to true if the join type is a null aware join. |
| Status ProcessBuildBatch( |
| RowBatch* build_batch, HashTableCtx* ctx, bool build_filters, bool is_null_aware); |
| |
| /// Helper method for Send() that that does the actual work apart from updating the |
| /// counters. Also used by RepartitionBuildInput(). |
| Status AddBatch(RowBatch* build_batch); |
| |
| /// Helper method for FlushFinal() that does the actual work. Also used by |
| /// RepartitionBuildInput(). |
| Status FinalizeBuild(RuntimeState* state); |
| |
| /// Append 'row' to 'stream'. In the common case, appending the row to the stream |
| /// immediately succeeds. Otherwise this function falls back to the slower path of |
| /// AppendRowStreamFull(), which may spill partitions to free memory. Returns false |
| /// and sets 'status' if it was unable to append the row, even after spilling |
| /// partitions. This odd return convention is used to avoid emitting unnecessary code |
| /// for ~Status in perf-critical code. |
| bool AppendRow( |
| BufferedTupleStream* stream, TupleRow* row, Status* status) WARN_UNUSED_RESULT; |
| |
| /// Slow path for AppendRow() above. It is called when the stream has failed to append |
| /// the row. We need to find more memory by either switching to IO-buffers, in case the |
| /// stream still uses small buffers, or spilling a partition. Returns false and sets |
| /// 'status' if it was unable to append the row, even after spilling partitions. |
| bool AppendRowStreamFull(BufferedTupleStream* stream, TupleRow* row, |
| Status* status) noexcept WARN_UNUSED_RESULT; |
| |
| /// Frees memory by spilling one of the hash partitions. The 'mode' argument is passed |
| /// to the Spill() call for the selected partition. The current policy is to spill the |
| /// null-aware partition first (if a NAAJ), then the largest partition. Returns non-ok |
| /// status if we couldn't spill a partition. If 'spilled_partition' is non-NULL, set |
| /// to the partition that was the one spilled. |
| Status SpillPartition(BufferedTupleStream::UnpinMode mode, |
| PhjBuilderPartition** spilled_partition = nullptr) WARN_UNUSED_RESULT; |
| |
| /// Tries to build hash tables for all unspilled hash partitions. Called after |
| /// FlushFinal() when all build rows have been partitioned and added to the appropriate |
| /// streams. If the hash table could not be built for a partition, the partition is |
| /// spilled (with all build blocks unpinned) and memory reservation is set aside |
| /// for a write buffer for the output probe streams, and, if this input is a spilled |
| /// partitioned, a read buffer for the input probe stream. |
| /// |
| /// When this function returns successfully, each partition is in one of these states: |
| /// 1. closed. No probe partition is created and the build partition is closed. No |
| /// probe stream memory is reserved for this partition. |
| /// 2. in-memory. The build rows are pinned and has a hash table built. No |
| /// probe stream memory is reserved for this partition. |
| /// 3. spilled. The build rows are fully unpinned and the probe stream is prepared. |
| /// Memory for a probe stream write buffer is reserved for this partition. |
| /// |
| /// 'next_state' is the state that this will transition into after building the hash |
| /// tables, either PARTITIONING_PROBE or REPARTITIONING_PROBE. |
| Status BuildHashTablesAndReserveProbeBuffers(HashJoinState next_state); |
| |
| /// Ensures that 'probe_stream_reservation_' has enough reservation for a stream per |
| /// spilled partition in 'hash_partitions_', plus for the input stream if the input |
| /// is a spilled partition (determined by 'next_state' - either PARTITIONING_PROBE or |
| /// REPARTITIONING_PROBE). If num_probe_threads_ is > 1, reserves this amount for each |
| /// probe thread. May spill additional partitions until it can free enough |
| /// reservation. Returns an error if an error is encountered or if it runs out of |
| /// partitions to spill. |
| Status ReserveProbeBuffers(HashJoinState next_state); |
| |
| /// Returns the number of partitions in 'partitions' that are spilled. |
| static int GetNumSpilledPartitions( |
| const std::vector<std::unique_ptr<PhjBuilderPartition>>& partitions); |
| |
| /// Transfer reservation for probe streams to 'probe_client'. Memory for one stream was |
| /// reserved per spilled partition in FlushFinal(), plus the input stream if the input |
| /// partition was spilled. |
| /// This is safe to call from multiple probe threads concurrently. |
| Status TransferProbeStreamReservation(BufferPool::ClientHandle* probe_client); |
| |
| /// Calculates the amount of memory per probe thread/join node instance to be |
| /// transferred for probe streams when probing in the given 'state'. Depends on |
| /// 'hash_partitions_', 'spilled_partitions_' and 'spillable_buffer_size_'. |
| int64_t CalcProbeStreamReservation(HashJoinState state) const; |
| |
| /// The serial part of BeginSpilledProbe() that is executed by a single thread. |
| Status BeginSpilledProbeSerial(); |
| |
| /// Creates new hash partitions and repartitions 'input_partition' into PARTITION_FANOUT |
| /// new partitions with level input_partition->level() + 1. The previous hash partitions |
| /// must have been cleared with ClearHashPartitions(). This function reserves enough |
| /// memory for a read buffer for the input probe stream and a write buffer for each |
| /// spilled partition after repartitioning. |
| Status RepartitionBuildInput(PhjBuilderPartition* input_partition) WARN_UNUSED_RESULT; |
| |
| /// Returns the largest build row count out of the current hash partitions. |
| int64_t LargestPartitionRows() const; |
| |
| /// Helper for DoneProbingHashPartitions() that processes and cleans up the hash |
| /// partitions. |
| void CleanUpHashPartitions( |
| std::deque<std::unique_ptr<PhjBuilderPartition>>* output_partitions, |
| RowBatch* batch); |
| |
| /// Helper for DoneProbingSinglePartition() that processes and cleans up the current |
| /// spilled partition. |
| void CleanUpSinglePartition( |
| std::deque<std::unique_ptr<PhjBuilderPartition>>* output_partitions, |
| RowBatch* batch); |
| |
| /// The serial part of BeginNullAwareProbe() that is executed by a single thread. |
| Status BeginNullAwareProbeSerial(); |
| |
| /// Close the null aware partition (if there is one) and set it to NULL. |
| void CloseNullAwarePartition(); |
| |
| /// Calls Close() on every Partition, deletes them, and cleans up any pointers that |
| /// may reference them. If 'row_batch' if not NULL, transfers the ownership of all |
| /// row-backing resources to it. |
| void CloseAndDeletePartitions(RowBatch* row_batch); |
| |
| /// For each filter in filters_, allocate a runtime_filter from the fragment-local |
| /// RuntimeFilterBank and store it in runtime_filters_ to populate during the build |
| /// phase. |
| void AllocateRuntimeFilters(); |
| |
| /// Iterates over the runtime filters and inserts each row into each filter. |
| /// This is replaced at runtime with code generated by CodegenInsertRuntimeFilters(). |
| void InsertRuntimeFilters(FilterContext filter_ctxs[], TupleRow* build_row) noexcept; |
| |
| /// Publish the runtime filters to the fragment-local RuntimeFilterBank. |
| /// 'num_build_rows' is used to determine whether the computed filters have an |
| /// unacceptably high false-positive rate. |
| void PublishRuntimeFilters(int64_t num_build_rows); |
| |
| // Determine the usefulness of min/max filters in the context of column min/max stats. |
| // Set AlwaysTrue to true for each not useful. Called at the end of AddBatch(). |
| void DetermineUsefulnessForMinmaxFilters(); |
| |
| RuntimeState* const runtime_state_; |
| |
| /// Seed used for hashing rows. Must match seed used in the PartitionedHashJoinNode. |
| const uint32_t hash_seed_; |
| |
| /// Pool for objects with same lifetime as builder. |
| ObjectPool obj_pool_; |
| |
| /// Resource information sent from the frontend. Non-null if this is a separate join |
| /// build. |
| const TBackendResourceProfile* const resource_profile_; |
| |
| /// Wraps the buffer pool client. Only used if this is a separate build sink. The node's |
| /// minimum reservation is claimed in Open(). After this, the client must hold onto |
| /// at least the minimum reservation so that it can be returned to the initial |
| /// reservations pool in Close(). |
| ReservationManager reservation_manager_; |
| |
| /// Client to the buffer pool, used to allocate build partition buffers and hash tables. |
| /// When probing, the spilling algorithm keeps some build partitions in memory while |
| /// using memory for probe buffers for spilled partitions. |
| /// Memory is shared between build and probe in different ways, depending on whether |
| /// this is a separate join build (i.e. 'is_separate_build_' is true). If a separate |
| /// build, this builder has its own buffer pool client, and transfer reservation to |
| /// the probe client when needed. If the builder is embedded in the join node, this |
| /// is just a pointer to the join node's client so no transfer is required. |
| BufferPool::ClientHandle* buffer_pool_client_; |
| |
| /// The default and max buffer sizes to use in the build streams. |
| const int64_t spillable_buffer_size_; |
| const int64_t max_row_buffer_size_; |
| |
| /// Allocator for hash table memory. |
| boost::scoped_ptr<Suballocator> ht_allocator_; |
| |
| /// Expressions over input rows for hash table build. |
| const std::vector<ScalarExpr*>& build_exprs_; |
| |
| /// is_not_distinct_from_[i] is true if and only if the ith equi-join predicate is IS |
| /// NOT DISTINCT FROM, rather than equality. |
| std::vector<bool> is_not_distinct_from_; |
| |
| /// Expressions for evaluating input rows for insertion into runtime filters. |
| /// Only includes exprs for filters produced by this builder. |
| const std::vector<ScalarExpr*>& filter_exprs_; |
| |
| /// List of filters to build. One-to-one correspondence with exprs in 'filter_exprs_'. |
| std::vector<FilterContext> filter_ctxs_; |
| |
| /// Separately cached list of min/max filter contexts populated during |
| /// PhjBuilder::AllocateRuntimeFilters() to speed up |
| /// PhjBuilder::DetermineUsefulnessForMinmaxFilters() where only minmax filters are |
| /// relevant. Contexts in the vector are removed if they are determined to host |
| /// min/max filters that are overlapping with column stats too much. |
| std::vector<FilterContext*> minmax_filter_ctxs_; |
| |
| /// Reference to the hash table config which is a part of the PhjBuilderConfig that was |
| /// used to create this object. Its used to create an instance of the HashTableCtx in |
| /// Prepare(). Not Owned. |
| const HashTableConfig& hash_table_config_; |
| |
| /// Used for hash-related functionality, such as evaluating rows and calculating hashes. |
| /// The level is set to the same level as 'hash_partitions_'. |
| boost::scoped_ptr<HashTableCtx> ht_ctx_; |
| |
| /// Counters and profile objects for HashTable stats |
| std::unique_ptr<HashTableStatsProfile> ht_stats_profile_; |
| |
| /// Total number of partitions created. |
| RuntimeProfile::Counter* partitions_created_ = nullptr; |
| |
| /// The largest fraction (of build side) after repartitioning. This is expected to be |
| /// 1 / PARTITION_FANOUT. A value much larger indicates skew. |
| RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_ = nullptr; |
| |
| /// Level of max partition (i.e. number of repartitioning steps). |
| RuntimeProfile::HighWaterMarkCounter* max_partition_level_ = nullptr; |
| |
| /// Number of partitions that have been spilled. |
| RuntimeProfile::Counter* num_spilled_partitions_ = nullptr; |
| |
| /// Number of partitions that have been repartitioned. |
| RuntimeProfile::Counter* num_repartitions_ = nullptr; |
| |
| /// Time spent partitioning build rows. |
| RuntimeProfile::Counter* partition_build_rows_timer_ = nullptr; |
| |
| /// Time spent building hash tables. |
| RuntimeProfile::Counter* build_hash_table_timer_ = nullptr; |
| |
| /// Number of partitions which had zero probe rows and we therefore didn't build the |
| /// hash table. |
| RuntimeProfile::Counter* num_hash_table_builds_skipped_ = nullptr; |
| |
| /// Time spent repartitioning and building hash tables of any resulting partitions |
| /// that were not spilled. |
| RuntimeProfile::Counter* repartition_timer_ = nullptr; |
| |
| // Barrier used to synchronize the probe-side threads at synchronization points in the |
| // partitioned hash join algorithm. Used only when 'num_probe_threads_' > 1. |
| std::unique_ptr<CyclicBarrier> probe_barrier_; |
| |
| /// Cached copy of the min/max filter threshold value to avoid repeated fetch of |
| /// the value from the plan. It remains a constant through out the execution of |
| /// the query. |
| float minmax_filter_threshold_ = 0.0; |
| |
| ///////////////////////////////////////// |
| /// BEGIN: Members that must be Reset() |
| |
| /// State of the partitioned hash join algorithm. See HashJoinState for more |
| /// information. |
| HashJoinState state_ = HashJoinState::PARTITIONING_BUILD; |
| |
| /// If true, the build side has at least one row. |
| /// Set in FlushFinal() and not modified until Reset(). |
| bool non_empty_build_ = false; |
| |
| /// Id to assign to the next partition created. |
| PartitionId next_partition_id_ = 0; |
| |
| /// The current set of partitions that are being built or probed. This vector is |
| /// initialized before partitioning or re-partitioning the build input |
| /// and cleared after we've finished probing the partitions. |
| /// This is not used when processing a single spilled partition. |
| std::vector<std::unique_ptr<PhjBuilderPartition>> hash_partitions_; |
| |
| /// Spilled partitions that need further processing. Populated in |
| /// DoneProbingHashPartitions() with the spilled hash partitions. |
| /// |
| /// This is used as a stack to do a depth-first walk of spilled partitions (i.e. more |
| /// finely partitioned partitions are processed first). This allows us to delete spilled |
| /// data and bottom out the recursion earlier. |
| /// |
| /// spilled_partitions_.back() is the spilled partition being processed, if one is |
| /// currently being processed (i.e. between BeginSpilledProbe() and the corresponding |
| /// DoneProbing*() call). |
| std::vector<std::unique_ptr<PhjBuilderPartition>> spilled_partitions_; |
| |
| /// Partition used for null-aware joins. This partition is always processed at the end |
| /// after all build and probe rows are processed. In this partition's 'build_rows_', we |
| /// store all the rows for which 'build_expr_evals_' evaluated over the row returns |
| /// NULL (i.e. it has a NULL on the eq join slot). |
| /// NULL if the join is not null aware or we are done processing this partition. |
| /// This partition starts off in memory but can be spilled. |
| std::unique_ptr<PhjBuilderPartition> null_aware_partition_; |
| |
| /// Populated during the hash table building phase if any partitions spilled. |
| /// Reservation for one probe stream write buffer per spilled partition is |
| /// saved to be handed off to PartitionedHashJoinNode for use in buffering |
| /// spilled probe rows. |
| /// |
| /// The allocation is done in the builder so that it can divide memory between the |
| /// in-memory build partitions and write buffers based on the size of the partitions |
| /// and available memory. E.g. if all the partitions fit in memory, no write buffers |
| /// need to be allocated, but if some partitions are spilled, more build partitions |
| /// may be spilled to free up memory for write buffers. |
| /// |
| /// Because of this, at the end of the build phase, we always have sufficient memory |
| /// to execute the probe phase of the algorithm without spilling more partitions. |
| /// |
| /// Initialized in Open() and closed in Closed(). |
| BufferPool::SubReservation probe_stream_reservation_; |
| |
| /// END: Members that must be Reset() |
| ///////////////////////////////////////// |
| |
| /// Jitted ProcessBuildBatch function pointers. NULL if codegen is disabled. |
| const CodegenFnPtr<PhjBuilderConfig::ProcessBuildBatchFn>& process_build_batch_fn_; |
| const CodegenFnPtr<PhjBuilderConfig::ProcessBuildBatchFn>& |
| process_build_batch_fn_level0_; |
| |
| /// Jitted Partition::InsertBatch() function pointers. NULL if codegen is disabled. |
| const CodegenFnPtr<InsertBatchFn>& insert_batch_fn_; |
| const CodegenFnPtr<InsertBatchFn>& insert_batch_fn_level0_; |
| }; |
| } // namespace impala |
| #endif |