blob: ade223bfcfc2563c41d492ffb1041a875be42eb9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <deque>
#include <boost/scoped_ptr.hpp>
#include "exec/exec-node.h"
#include "exec/hash-table.h"
#include "runtime/buffered-tuple-stream.h"
#include "runtime/bufferpool/suballocator.h"
#include "runtime/descriptors.h" // for TupleId
#include "runtime/mem-pool.h"
#include "runtime/string-value.h"
namespace llvm {
class BasicBlock;
class Function;
class Value;
namespace impala {
class AggFn;
class AggFnEvaluator;
class CodegenAnyVal;
class LlvmCodeGen;
class LlvmBuilder;
class RowBatch;
class RuntimeState;
struct StringValue;
class Tuple;
class TupleDescriptor;
class SlotDescriptor;
/// Node for doing partitioned hash aggregation.
/// This node consumes the input (which can be from the child(0) or a spilled partition).
/// 1. Each row is hashed and we pick a dst partition (hash_partitions_).
/// 2. If the dst partition is not spilled, we probe into the partitions hash table
/// to aggregate/insert the row.
/// 3. If the partition is already spilled, the input row is spilled.
/// 4. When all the input is consumed, we walk hash_partitions_, put the spilled ones
/// into spilled_partitions_ and the non-spilled ones into aggregated_partitions_.
/// aggregated_partitions_ contain partitions that are fully processed and the result
/// can just be returned. Partitions in spilled_partitions_ need to be repartitioned
/// and we just repeat these steps.
/// Each partition contains these structures:
/// 1) Hash Table for aggregated rows. This contains just the hash table directory
/// structure but not the rows themselves. This is NULL for spilled partitions when
/// we stop maintaining the hash table.
/// 2) MemPool for var-len result data for rows in the hash table. If the aggregate
/// function returns a string, we cannot append it to the tuple stream as that
/// structure is immutable. Instead, when we need to spill, we sweep and copy the
/// rows into a tuple stream.
/// 3) Aggregated tuple stream for rows that are/were in the hash table. This stream
/// contains rows that are aggregated. When the partition is not spilled, this stream
/// is pinned and contains the memory referenced by the hash table.
/// In the case where the aggregate function does not return a string (meaning the
/// size of all the slots is known when the row is constructed), this stream contains
/// all the memory for the result rows and the MemPool (2) is not used.
/// 4) Unaggregated tuple stream. Stream to spill unaggregated rows.
/// Rows in this stream always have child(0)'s layout.
/// Buffering: Each stream and hash table needs to maintain at least one buffer when
/// it is being read or written. The streams for a given agg use a uniform buffer size,
/// except when processing rows larger than that buffer size. In that case, the agg uses
/// BufferedTupleStream's variable buffer size support to handle larger rows up to the
/// maximum row size. Only two max-sized buffers are needed for the agg to spill: one
/// to hold rows being read from a spilled input stream and another for a temporary write
/// buffer when adding a row to an output stream.
/// Two-phase aggregation: we support two-phase distributed aggregations, where
/// pre-aggregrations attempt to reduce the size of data before shuffling data across the
/// network to be merged by the merge aggregation node. This exec node supports a
/// streaming mode for pre-aggregations where it maintains a hash table of aggregated
/// rows, but can pass through unaggregated rows (after transforming them into the
/// same tuple format as aggregated rows) when a heuristic determines that it is better
/// to send rows across the network instead of consuming additional memory and CPU
/// resources to expand its hash table. The planner decides whether a given
/// pre-aggregation should use the streaming preaggregation algorithm or the same
/// blocking aggregation algorithm as used in merge aggregations.
/// TODO: make this less of a heuristic by factoring in the cost of the exchange vs the
/// cost of the pre-aggregation.
/// If there are no grouping expressions, there is only a single output row for both
/// preaggregations and merge aggregations. This case is handled separately to avoid
/// building hash tables. There is also no need to do streaming preaggregations.
/// Handling memory pressure: the node uses two different strategies for responding to
/// memory pressure, depending on whether it is a streaming pre-aggregation or not. If
/// the node is a streaming preaggregation, it stops growing its hash table further by
/// converting unaggregated rows into the aggregated tuple format and passing them
/// through. If the node is not a streaming pre-aggregation, it responds to memory
/// pressure by spilling partitions to disk.
/// TODO: Buffer rows before probing into the hash table?
/// TODO: After spilling, we can still maintain a very small hash table just to remove
/// some number of rows (from likely going to disk).
/// TODO: Consider allowing to spill the hash table structure in addition to the rows.
/// TODO: Do we want to insert a buffer before probing into the partition's hash table?
/// TODO: Use a prefetch/batched probe interface.
/// TODO: Return rows from the aggregated_row_stream rather than the HT.
/// TODO: Think about spilling heuristic.
/// TODO: When processing a spilled partition, we have a lot more information and can
/// size the partitions/hash tables better.
/// TODO: Start with unpartitioned (single partition) and switch to partitioning and
/// spilling only if the size gets large, say larger than the LLC.
/// TODO: Simplify or cleanup the various uses of agg_fn_ctx, agg_fn_ctx_, and ctx.
/// There are so many contexts in use that a plain "ctx" variable should never be used.
/// Likewise, it's easy to mixup the agg fn ctxs, there should be a way to simplify this.
/// TODO: support an Init() method with an initial value in the UDAF interface.
class PartitionedAggregationNode : public ExecNode {
PartitionedAggregationNode(ObjectPool* pool,
const TPlanNode& tnode, const DescriptorTbl& descs);
virtual Status Init(const TPlanNode& tnode, RuntimeState* state);
virtual Status Prepare(RuntimeState* state);
virtual void Codegen(RuntimeState* state);
virtual Status Open(RuntimeState* state);
virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos);
virtual Status Reset(RuntimeState* state);
virtual void Close(RuntimeState* state);
static const char* LLVM_CLASS_NAME;
/// Frees local allocations from aggregate_evals_ and agg_fn_evals
virtual Status QueryMaintenance(RuntimeState* state);
virtual void DebugString(int indentation_level, std::stringstream* out) const;
struct Partition;
/// Number of initial partitions to create. Must be a power of 2.
static const int PARTITION_FANOUT = 16;
/// Needs to be the log(PARTITION_FANOUT).
/// We use the upper bits to pick the partition and lower bits in the HT.
/// TODO: different hash functions here too? We don't need that many bits to pick
/// the partition so this might be okay.
static const int NUM_PARTITIONING_BITS = 4;
/// Maximum number of times we will repartition. The maximum build table we can process
/// (if we have enough scratch disk space) in case there is no skew is:
/// In the case where there is skew, repartitioning is unlikely to help (assuming a
/// reasonable hash function).
/// Note that we need to have at least as many SEED_PRIMES in HashTableCtx.
/// TODO: we can revisit and try harder to explicitly detect skew.
static const int MAX_PARTITION_DEPTH = 16;
/// Default initial number of buckets in a hash table.
/// TODO: rethink this ?
static const int64_t PAGG_DEFAULT_HASH_TABLE_SZ = 1024;
/// Codegen doesn't allow for automatic Status variables because then exception
/// handling code is needed to destruct the Status, and our function call substitution
/// doesn't know how to deal with the LLVM IR 'invoke' instruction. Workaround that by
/// placing the Status here so exceptions won't need to destruct it.
/// TODO: fix IMPALA-1948 and remove this.
Status process_batch_status_;
/// Tuple into which Update()/Merge()/Serialize() results are stored.
TupleId intermediate_tuple_id_;
TupleDescriptor* intermediate_tuple_desc_;
/// Row with the intermediate tuple as its only tuple.
/// Construct a new row desc for preparing the build exprs because neither the child's
/// nor this node's output row desc may contain the intermediate tuple, e.g.,
/// in a single-node plan with an intermediate tuple different from the output tuple.
/// Lives in the query state's obj_pool.
RowDescriptor intermediate_row_desc_;
/// Tuple into which Finalize() results are stored. Possibly the same as
/// the intermediate tuple.
TupleId output_tuple_id_;
TupleDescriptor* output_tuple_desc_;
/// Certain aggregates require a finalize step, which is the final step of the
/// aggregate after consuming all input rows. The finalize step converts the aggregate
/// value into its final form. This is true if this node contains aggregate that
/// requires a finalize step.
const bool needs_finalize_;
/// True if this is first phase of a two-phase distributed aggregation for which we
/// are doing a streaming preaggregation.
const bool is_streaming_preagg_;
/// True if any of the evaluators require the serialize step.
bool needs_serialize_;
/// The list of all aggregate operations for this exec node.
std::vector<AggFn*> agg_fns_;
/// Evaluators for each aggregate function and backing MemPool. String data
/// returned by the aggregate functions is allocated via these evaluators.
/// These evaluatorss are only used for the non-grouping cases. For queries
/// with the group-by clause, each partition will clone these evaluators.
/// TODO: we really need to plumb through CHAR(N) for intermediate types.
std::vector<AggFnEvaluator*> agg_fn_evals_;
boost::scoped_ptr<MemPool> agg_fn_pool_;
/// Exprs used to evaluate input rows
std::vector<ScalarExpr*> grouping_exprs_;
/// Exprs used to insert constructed aggregation tuple into the hash table.
/// All the exprs are simply SlotRefs for the intermediate tuple.
std::vector<ScalarExpr*> build_exprs_;
/// Indices of grouping exprs with var-len string types in grouping_exprs_.
/// We need to do more work for var-len expressions when allocating and spilling rows.
/// All var-len grouping exprs have type string.
std::vector<int> string_grouping_exprs_;
RuntimeState* state_;
/// Allocator for hash table memory.
boost::scoped_ptr<Suballocator> ht_allocator_;
/// MemPool used to allocate memory for when we don't have grouping and don't initialize
/// the partitioning structures, or during Close() when creating new output tuples.
/// For non-grouping aggregations, the ownership of the pool's memory is transferred
/// to the output batch on eos. The pool should not be Reset() to allow amortizing
/// memory allocation over a series of Reset()/Open()/GetNext()* calls.
boost::scoped_ptr<MemPool> mem_pool_;
/// The current partition and iterator to the next row in its hash table that we need
/// to return in GetNext()
Partition* output_partition_;
HashTable::Iterator output_iterator_;
typedef Status (*ProcessBatchNoGroupingFn)(PartitionedAggregationNode*, RowBatch*);
/// Jitted ProcessBatchNoGrouping function pointer. Null if codegen is disabled.
ProcessBatchNoGroupingFn process_batch_no_grouping_fn_;
typedef Status (*ProcessBatchFn)(
PartitionedAggregationNode*, RowBatch*, TPrefetchMode::type, HashTableCtx*);
/// Jitted ProcessBatch function pointer. Null if codegen is disabled.
ProcessBatchFn process_batch_fn_;
typedef Status (*ProcessBatchStreamingFn)(PartitionedAggregationNode*, bool,
TPrefetchMode::type, RowBatch*, RowBatch*, HashTableCtx*, int[PARTITION_FANOUT]);
/// Jitted ProcessBatchStreaming function pointer. Null if codegen is disabled.
ProcessBatchStreamingFn process_batch_streaming_fn_;
/// Time spent processing the child rows
RuntimeProfile::Counter* build_timer_;
/// Total time spent resizing hash tables.
RuntimeProfile::Counter* ht_resize_timer_;
/// Time spent returning the aggregated rows
RuntimeProfile::Counter* get_results_timer_;
/// Total number of hash buckets across all partitions.
RuntimeProfile::Counter* num_hash_buckets_;
/// Total number of partitions created.
RuntimeProfile::Counter* partitions_created_;
/// Level of max partition (i.e. number of repartitioning steps).
RuntimeProfile::HighWaterMarkCounter* max_partition_level_;
/// Number of rows that have been repartitioned.
RuntimeProfile::Counter* num_row_repartitioned_;
/// Number of partitions that have been repartitioned.
RuntimeProfile::Counter* num_repartitions_;
/// Number of partitions that have been spilled.
RuntimeProfile::Counter* num_spilled_partitions_;
/// The largest fraction after repartitioning. This is expected to be
/// 1 / PARTITION_FANOUT. A value much larger indicates skew.
RuntimeProfile::HighWaterMarkCounter* largest_partition_percent_;
/// Time spent in streaming preagg algorithm.
RuntimeProfile::Counter* streaming_timer_;
/// The number of rows passed through without aggregation.
RuntimeProfile::Counter* num_passthrough_rows_;
/// The estimated reduction of the preaggregation.
RuntimeProfile::Counter* preagg_estimated_reduction_;
/// Expose the minimum reduction factor to continue growing the hash tables.
RuntimeProfile::Counter* preagg_streaming_ht_min_reduction_;
/// The estimated number of input rows from the planner.
int64_t estimated_input_cardinality_;
/// BEGIN: Members that must be Reset()
/// Result of aggregation w/o GROUP BY.
/// Note: can be NULL even if there is no grouping if the result tuple is 0 width
/// e.g. select 1 from table group by col.
Tuple* singleton_output_tuple_;
bool singleton_output_tuple_returned_;
/// Row batch used as argument to GetNext() for the child node preaggregations. Store
/// in node to avoid reallocating for every GetNext() call when streaming.
boost::scoped_ptr<RowBatch> child_batch_;
/// If true, no more rows to output from partitions.
bool partition_eos_;
/// True if no more rows to process from child.
bool child_eos_;
/// Used for hash-related functionality, such as evaluating rows and calculating hashes.
/// It also owns the evaluators for the grouping and build expressions used during hash
/// table insertion and probing.
boost::scoped_ptr<HashTableCtx> ht_ctx_;
/// Object pool that holds the Partition objects in hash_partitions_.
boost::scoped_ptr<ObjectPool> partition_pool_;
/// Current partitions we are partitioning into. IMPALA-5788: For the case where we
/// rebuild a spilled partition that fits in memory, all pointers in this vector will
/// point to a single in-memory partition.
std::vector<Partition*> hash_partitions_;
/// Cache for hash tables in 'hash_partitions_'. IMPALA-5788: For the case where we
/// rebuild a spilled partition that fits in memory, all pointers in this array will
/// point to the hash table that is a part of a single in-memory partition.
HashTable* hash_tbls_[PARTITION_FANOUT];
/// All partitions that have been spilled and need further processing.
std::deque<Partition*> spilled_partitions_;
/// All partitions that are aggregated and can just return the results in GetNext().
/// After consuming all the input, hash_partitions_ is split into spilled_partitions_
/// and aggregated_partitions_, depending on if it was spilled or not.
std::deque<Partition*> aggregated_partitions_;
/// END: Members that must be Reset()
/// The hash table and streams (aggregated and unaggregated) for an individual
/// partition. The streams of each partition always (i.e. regardless of level)
/// initially use small buffers. Streaming pre-aggregations do not spill and do not
/// require an unaggregated stream.
struct Partition {
Partition(PartitionedAggregationNode* parent, int level, int idx)
: parent(parent), is_closed(false), level(level), idx(idx) {}
/// Initializes aggregated_row_stream and unaggregated_row_stream (if a spilling
/// aggregation), allocating one buffer for each. Spilling merge aggregations must
/// have enough reservation for the initial buffer for the stream, so this should
/// not fail due to OOM. Preaggregations do not reserve any buffers: if does not
/// have enough reservation for the initial buffer, the aggregated row stream is not
/// created and an OK status is returned.
Status InitStreams() WARN_UNUSED_RESULT;
/// Initializes the hash table. 'aggregated_row_stream' must be non-NULL.
/// Sets 'got_memory' to true if the hash table was initialised or false on OOM.
Status InitHashTable(bool* got_memory) WARN_UNUSED_RESULT;
/// Called in case we need to serialize aggregated rows. This step effectively does
/// a merge aggregation in this node.
Status SerializeStreamForSpilling() WARN_UNUSED_RESULT;
/// Closes this partition. If finalize_rows is true, this iterates over all rows
/// in aggregated_row_stream and finalizes them (this is only used in the cancellation
/// path).
void Close(bool finalize_rows);
/// Spill this partition. 'more_aggregate_rows' = true means that more aggregate rows
/// may be appended to the the partition before appending unaggregated rows. On
/// success, one of the streams is left with a write iterator: the aggregated stream
/// if 'more_aggregate_rows' is true or the unaggregated stream otherwise.
Status Spill(bool more_aggregate_rows) WARN_UNUSED_RESULT;
/// Discards the aggregated row stream and hash table. Only valid to call if this is
/// a streaming preaggregation and the initial memory allocation for hash tables or
/// the aggregated stream failed. The aggregated stream must have 0 rows.
void DiscardAggregatedRowStream();
bool is_spilled() const { return hash_tbl.get() == NULL; }
PartitionedAggregationNode* parent;
/// If true, this partition is closed and there is nothing left to do.
bool is_closed;
/// How many times rows in this partition have been repartitioned. Partitions created
/// from the node's children's input is level 0, 1 after the first repartitionining,
/// etc.
const int level;
/// The index of this partition within 'hash_partitions_' at its level.
const int idx;
/// Hash table for this partition.
/// Can be NULL if this partition is no longer maintaining a hash table (i.e.
/// is spilled or we are passing through all rows for this partition).
boost::scoped_ptr<HashTable> hash_tbl;
/// Clone of parent's agg_fn_evals_ and backing MemPool.
std::vector<AggFnEvaluator*> agg_fn_evals;
boost::scoped_ptr<MemPool> agg_fn_pool;
/// Tuple stream used to store aggregated rows. When the partition is not spilled,
/// (meaning the hash table is maintained), this stream is pinned and contains the
/// memory referenced by the hash table. When it is spilled, this consumes reservation
/// for a write buffer only during repartitioning of aggregated rows.
/// For streaming preaggs, this may be NULL if sufficient memory is not available.
/// In that case hash_tbl is also NULL and all rows for the partition will be passed
/// through.
boost::scoped_ptr<BufferedTupleStream> aggregated_row_stream;
/// Unaggregated rows that are spilled. Always NULL for streaming pre-aggregations.
/// Always unpinned. Has a write buffer allocated when the partition is spilled and
/// unaggregated rows are being processed.
boost::scoped_ptr<BufferedTupleStream> unaggregated_row_stream;
/// Stream used to store serialized spilled rows. Only used if needs_serialize_
/// is set. This stream is never pinned and only used in Partition::Spill as a
/// a temporary buffer.
boost::scoped_ptr<BufferedTupleStream> serialize_stream_;
/// Accessor for 'hash_tbls_' that verifies consistency with the partitions.
HashTable* ALWAYS_INLINE GetHashTable(int partition_idx) {
HashTable* ht = hash_tbls_[partition_idx];
DCHECK_EQ(ht, hash_partitions_[partition_idx]->hash_tbl.get());
return ht;
/// Materializes 'row_batch' in either grouping or non-grouping case.
Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos);
/// Helper function called by GetNextInternal() to ensure that string data referenced in
/// 'row_batch' will live as long as 'row_batch's tuples. 'first_row_idx' indexes the
/// first row that should be processed in 'row_batch'.
Status HandleOutputStrings(RowBatch* row_batch, int first_row_idx);
/// Copies string data from the specified slot into 'pool', and sets the StringValues'
/// ptrs to the copied data. Copies data from all tuples in 'row_batch' from
/// 'first_row_idx' onwards. 'slot_desc' must have a var-len string type.
Status CopyStringData(const SlotDescriptor& slot_desc, RowBatch* row_batch,
int first_row_idx, MemPool* pool);
/// Constructs singleton output tuple, allocating memory from pool.
Tuple* ConstructSingletonOutputTuple(
const std::vector<AggFnEvaluator*>& agg_fn_evals, MemPool* pool);
/// Copies grouping values stored in 'ht_ctx_' that were computed over 'current_row_'
/// using 'grouping_expr_evals_'. Aggregation expr slots are set to their initial
/// values. Returns NULL if there was not enough memory to allocate the tuple or errors
/// occurred. In which case, 'status' is set. Allocates tuple and var-len data for
/// grouping exprs from stream. Var-len data for aggregate exprs is allocated from the
/// FunctionContexts, so is stored outside the stream. If stream's small buffers get
/// full, it will attempt to switch to IO-buffers.
Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals,
BufferedTupleStream* stream, Status* status) noexcept;
/// Constructs intermediate tuple, allocating memory from pool instead of the stream.
/// Returns NULL and sets status if there is not enough memory to allocate the tuple.
Tuple* ConstructIntermediateTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals,
MemPool* pool, Status* status) noexcept;
/// Returns the number of bytes of variable-length data for the grouping values stored
/// in 'ht_ctx_'.
int GroupingExprsVarlenSize();
/// Initializes intermediate tuple by copying grouping values stored in 'ht_ctx_' that
/// that were computed over 'current_row_' using 'grouping_expr_evals_'. Writes the
/// var-len data into buffer. 'buffer' points to the start of a buffer of at least the
/// size of the variable-length data: 'varlen_size'.
void CopyGroupingValues(Tuple* intermediate_tuple, uint8_t* buffer, int varlen_size);
/// Initializes the aggregate function slots of an intermediate tuple.
/// Any var-len data is allocated from the FunctionContexts.
void InitAggSlots(const std::vector<AggFnEvaluator*>& agg_fn_evals,
Tuple* intermediate_tuple);
/// Updates the given aggregation intermediate tuple with aggregation values computed
/// over 'row' using 'agg_fn_evals'. Whether the agg fn evaluator calls Update() or
/// Merge() is controlled by the evaluator itself, unless enforced explicitly by passing
/// in is_merge == true. The override is needed to merge spilled and non-spilled rows
/// belonging to the same partition independent of whether the agg fn evaluators have
/// is_merge() == true.
/// This function is replaced by codegen (which is why we don't use a vector argument
/// for agg_fn_evals).. Any var-len data is allocated from the FunctionContexts.
/// TODO: Fix the arguments order. Need to update CodegenUpdateTuple() too.
void UpdateTuple(AggFnEvaluator** agg_fn_evals, Tuple* tuple, TupleRow* row,
bool is_merge = false) noexcept;
/// Called on the intermediate tuple of each group after all input rows have been
/// consumed and aggregated. Computes the final aggregate values to be returned in
/// GetNext() using the agg fn evaluators' Serialize() or Finalize().
/// For the Finalize() case if the output tuple is different from the intermediate
/// tuple, then a new tuple is allocated from 'pool' to hold the final result.
/// Grouping values are copied into the output tuple and the the output tuple holding
/// the finalized/serialized aggregate values is returned.
/// TODO: Coordinate the allocation of new tuples with the release of memory
/// so as not to make memory consumption blow up.
Tuple* GetOutputTuple(const std::vector<AggFnEvaluator*>& agg_fn_evals,
Tuple* tuple, MemPool* pool);
/// Do the aggregation for all tuple rows in the batch when there is no grouping.
/// This function is replaced by codegen.
Status ProcessBatchNoGrouping(RowBatch* batch) WARN_UNUSED_RESULT;
/// Processes a batch of rows. This is the core function of the algorithm. We partition
/// the rows into hash_partitions_, spilling as necessary.
/// If AGGREGATED_ROWS is true, it means that the rows in the batch are already
/// pre-aggregated.
/// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE,
/// hash table buckets will be prefetched based on the hash values computed. Note
/// that 'prefetch_mode' will be substituted with constants during codegen time.
/// This function is replaced by codegen. We pass in ht_ctx_.get() as an argument for
/// performance.
template <bool AGGREGATED_ROWS>
Status IR_ALWAYS_INLINE ProcessBatch(RowBatch* batch, TPrefetchMode::type prefetch_mode,
HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
/// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the results in
/// the expression values cache in 'ht_ctx'. The number of rows evaluated depends on
/// the capacity of the cache. 'prefetch_mode' specifies the prefetching mode in use.
/// If it's not PREFETCH_NONE, hash table buckets for the computed hashes will be
/// prefetched. Note that codegen replaces 'prefetch_mode' with a constant.
template<bool AGGREGATED_ROWS>
void EvalAndHashPrefetchGroup(RowBatch* batch, int start_row_idx,
TPrefetchMode::type prefetch_mode, HashTableCtx* ht_ctx);
/// This function processes each individual row in ProcessBatch(). Must be inlined into
/// ProcessBatch for codegen to substitute function calls with codegen'd versions.
/// May spill partitions if not enough memory is available.
template <bool AGGREGATED_ROWS>
Status IR_ALWAYS_INLINE ProcessRow(
TupleRow* row, HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
/// Create a new intermediate tuple in partition, initialized with row. ht_ctx is
/// the context for the partition's hash table and hash is the precomputed hash of
/// the row. The row can be an unaggregated or aggregated row depending on
/// AGGREGATED_ROWS. Spills partitions if necessary to append the new intermediate
/// tuple to the partition's stream. Must be inlined into ProcessBatch for codegen
/// to substitute function calls with codegen'd versions. insert_it is an iterator
/// for insertion returned from HashTable::FindBuildRowBucket().
template <bool AGGREGATED_ROWS>
Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition, TupleRow* row,
uint32_t hash, HashTable::Iterator insert_it) WARN_UNUSED_RESULT;
/// Append a row to a spilled partition. The row may be aggregated or unaggregated
/// according to AGGREGATED_ROWS. May spill partitions if needed to append the row
/// buffers.
template <bool AGGREGATED_ROWS>
Status IR_ALWAYS_INLINE AppendSpilledRow(
Partition* partition, TupleRow* row) WARN_UNUSED_RESULT;
/// Reads all the rows from input_stream and process them by calling ProcessBatch().
template <bool AGGREGATED_ROWS>
Status ProcessStream(BufferedTupleStream* input_stream) WARN_UNUSED_RESULT;
/// Output 'singleton_output_tuple_' and transfer memory to 'row_batch'.
void GetSingletonOutput(RowBatch* row_batch);
/// Get rows for the next rowbatch from the next partition. Sets 'partition_eos_' to
/// true if all rows from all partitions have been returned or the limit is reached.
Status GetRowsFromPartition(
RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT;
/// Get output rows from child for streaming pre-aggregation. Aggregates some rows with
/// hash table and passes through other rows converted into the intermediate
/// tuple format. Sets 'child_eos_' once all rows from child have been returned.
Status GetRowsStreaming(RuntimeState* state, RowBatch* row_batch) WARN_UNUSED_RESULT;
/// Return true if we should keep expanding hash tables in the preagg. If false,
/// the preagg should pass through any rows it can't fit in its tables.
bool ShouldExpandPreaggHashTables() const;
/// Streaming processing of in_batch from child. Rows from child are either aggregated
/// into the hash table or added to 'out_batch' in the intermediate tuple format.
/// 'in_batch' is processed entirely, and 'out_batch' must have enough capacity to
/// store all of the rows in 'in_batch'.
/// 'needs_serialize' is an argument so that codegen can replace it with a constant,
/// rather than using the member variable 'needs_serialize_'.
/// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE,
/// hash table buckets will be prefetched based on the hash values computed. Note
/// that 'prefetch_mode' will be substituted with constants during codegen time.
/// 'remaining_capacity' is an array with PARTITION_FANOUT entries with the number of
/// additional rows that can be added to the hash table per partition. It is updated
/// by ProcessBatchStreaming() when it inserts new rows.
/// 'ht_ctx' is passed in as a way to avoid aliasing of 'this' confusing the optimiser.
Status ProcessBatchStreaming(bool needs_serialize, TPrefetchMode::type prefetch_mode,
RowBatch* in_batch, RowBatch* out_batch, HashTableCtx* ht_ctx,
/// Tries to add intermediate to the hash table 'hash_tbl' of 'partition' for streaming
/// aggregation. The input row must have been evaluated with 'ht_ctx', with 'hash' set
/// to the corresponding hash. If the tuple already exists in the hash table, update
/// the tuple and return true. Otherwise try to create a new entry in the hash table,
/// returning true if successful or false if the table is full. 'remaining_capacity'
/// keeps track of how many more entries can be added to the hash table so we can avoid
/// retrying inserts. It is decremented if an insert succeeds and set to zero if an
/// insert fails. If an error occurs, returns false and sets 'status'.
bool IR_ALWAYS_INLINE TryAddToHashTable(HashTableCtx* ht_ctx, Partition* partition,
HashTable* hash_tbl, TupleRow* in_row, uint32_t hash, int* remaining_capacity,
Status* status) WARN_UNUSED_RESULT;
/// Initializes hash_partitions_. 'level' is the level for the partitions to create.
/// If 'single_partition_idx' is provided, it must be a number in range
/// [0, PARTITION_FANOUT), and only that partition is created - all others point to it.
/// Also sets ht_ctx_'s level to 'level'.
Status CreateHashPartitions(
int level, int single_partition_idx = -1) WARN_UNUSED_RESULT;
/// Ensure that hash tables for all in-memory partitions are large enough to fit
/// 'num_rows' additional hash table entries. If there is not enough memory to
/// resize the hash tables, may spill partitions. 'aggregated_rows' is true if
/// we're currently partitioning aggregated rows.
Status CheckAndResizeHashPartitions(
bool aggregated_rows, int num_rows, const HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
/// Prepares the next partition to return results from. On return, this function
/// initializes output_iterator_ and output_partition_. This either removes
/// a partition from aggregated_partitions_ (and is done) or removes the next
/// partition from aggregated_partitions_ and repartitions it.
Status NextPartition() WARN_UNUSED_RESULT;
/// Tries to build the first partition in 'spilled_partitions_'.
/// If successful, set *built_partition to the partition. The caller owns the partition
/// and is responsible for closing it. If unsuccessful because the partition could not
/// fit in memory, set *built_partition to NULL and append the spilled partition to the
/// head of 'spilled_partitions_' so it can be processed by
/// RepartitionSpilledPartition().
Status BuildSpilledPartition(Partition** built_partition) WARN_UNUSED_RESULT;
/// Repartitions the first partition in 'spilled_partitions_' into PARTITION_FANOUT
/// output partitions. On success, each output partition is either:
/// * closed, if no rows were added to the partition.
/// * in 'spilled_partitions_', if the partition spilled.
/// * in 'aggregated_partitions_', if the output partition was not spilled.
Status RepartitionSpilledPartition() WARN_UNUSED_RESULT;
/// Picks a partition from 'hash_partitions_' to spill. 'more_aggregate_rows' is passed
/// to Partition::Spill() when spilling the partition. See the Partition::Spill()
/// comment for further explanation.
Status SpillPartition(bool more_aggregate_rows) WARN_UNUSED_RESULT;
/// Moves the partitions in hash_partitions_ to aggregated_partitions_ or
/// spilled_partitions_. Partitions moved to spilled_partitions_ are unpinned.
/// input_rows is the number of input rows that have been repartitioned.
/// Used for diagnostics.
Status MoveHashPartitions(int64_t input_rows) WARN_UNUSED_RESULT;
/// Adds a partition to the front of 'spilled_partitions_' for later processing.
/// 'spilled_partitions_' uses LIFO so more finely partitioned partitions are processed
/// first). This allows us to delete pages earlier and bottom out the recursion
/// earlier and also improves time locality of access to spilled data on disk.
void PushSpilledPartition(Partition* partition);
/// Calls Close() on every Partition in 'aggregated_partitions_',
/// 'spilled_partitions_', and 'hash_partitions_' and then resets the lists,
/// the vector and the partition pool.
void ClosePartitions();
/// Calls finalizes on all tuples starting at 'it'.
void CleanupHashTbl(const std::vector<AggFnEvaluator*>& agg_fn_evals,
HashTable::Iterator it);
/// Codegen for updating aggregate expressions agg_fns_[agg_fn_idx]
/// and returns the IR function in 'fn'. Returns non-OK status if codegen
/// is unsuccessful.
Status CodegenUpdateSlot(LlvmCodeGen* codegen, int agg_fn_idx,
SlotDescriptor* slot_desc, llvm::Function** fn) WARN_UNUSED_RESULT;
/// Codegen a call to a function implementing the UDA interface with input values
/// from 'input_vals'. 'dst_val' should contain the previous value of the aggregate
/// function, and 'updated_dst_val' is set to the new value after the Update or Merge
/// operation is applied. The instruction sequence for the UDA call is inserted at
/// the insert position of 'builder'.
Status CodegenCallUda(LlvmCodeGen* codegen, LlvmBuilder* builder, AggFn* agg_fn,
llvm::Value* agg_fn_ctx_arg, const std::vector<CodegenAnyVal>& input_vals,
const CodegenAnyVal& dst_val, CodegenAnyVal* updated_dst_val) WARN_UNUSED_RESULT;
/// Codegen UpdateTuple(). Returns non-OK status if codegen is unsuccessful.
Status CodegenUpdateTuple(LlvmCodeGen* codegen, llvm::Function** fn) WARN_UNUSED_RESULT;
/// Codegen the non-streaming process row batch loop. The loop has already been
/// compiled to IR and loaded into the codegen object. UpdateAggTuple has also been
/// codegen'd to IR. This function will modify the loop subsituting the statically
/// compiled functions with codegen'd ones. 'process_batch_fn_' or
/// 'process_batch_no_grouping_fn_' will be updated with the codegened function
/// depending on whether this is a grouping or non-grouping aggregation.
/// Assumes AGGREGATED_ROWS = false.
Status CodegenProcessBatch(
LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
/// Codegen the materialization loop for streaming preaggregations.
/// 'process_batch_streaming_fn_' will be updated with the codegened function.
Status CodegenProcessBatchStreaming(
LlvmCodeGen* codegen, TPrefetchMode::type prefetch_mode) WARN_UNUSED_RESULT;
/// Compute minimum buffer reservation for grouping aggregations.
/// We need one buffer per partition, which is used either as the write buffer for the
/// aggregated stream or the unaggregated stream. We need an additional buffer to read
/// the stream we are currently repartitioning. The read buffer needs to be a max-sized
/// buffer to hold a max-sized row and we need one max-sized write buffer that is used
/// temporarily to append a row to any stream.
/// If we need to serialize, we need an additional buffer while spilling a partition
/// as the partitions aggregate stream needs to be serialized and rewritten.
/// We do not spill streaming preaggregations, so we do not need to reserve any buffers.
int64_t MinReservation() const {
// Must be kept in sync with AggregationNode.computeNodeResourceProfile() in fe.
if (is_streaming_preagg_) return 0; // Need 0 buffers to pass through rows.
int num_buffers = PARTITION_FANOUT + 1 + (needs_serialize_ ? 1 : 0);
// Two of the buffers must fit the maximum row.
return resource_profile_.spillable_buffer_size * (num_buffers - 2) +
resource_profile_.max_row_buffer_size * 2;