blob: 2829a9416f033b7273a098737ce9a63fdfee32a3 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Accumulator module contains the trait definition for aggregation function's accumulators.
use arrow::array::ArrayRef;
use datafusion_common::{internal_err, Result, ScalarValue};
use std::fmt::Debug;
/// Tracks an aggregate function's state.
///
/// `Accumulator`s are stateful objects that implement a single group. They
/// aggregate values from multiple rows together into a final output aggregate.
///
/// [`GroupsAccumulator]` is an additional more performant (but also complex) API
/// that manages state for multiple groups at once.
///
/// An accumulator knows how to:
/// * update its state from inputs via [`update_batch`]
///
/// * compute the final value from its internal state via [`evaluate`]
///
/// * retract an update to its state from given inputs via
/// [`retract_batch`] (when used as a window aggregate [window
/// function])
///
/// * convert its internal state to a vector of aggregate values via
/// [`state`] and combine the state from multiple accumulators
/// via [`merge_batch`], as part of efficient multi-phase grouping.
///
/// [`update_batch`]: Self::update_batch
/// [`retract_batch`]: Self::retract_batch
/// [`state`]: Self::state
/// [`evaluate`]: Self::evaluate
/// [`merge_batch`]: Self::merge_batch
/// [window function]: https://en.wikipedia.org/wiki/Window_function_(SQL)
pub trait Accumulator: Send + Sync + Debug {
/// Updates the accumulator's state from its input.
///
/// `values` contains the arguments to this aggregate function.
///
/// For example, the `SUM` accumulator maintains a running sum,
/// and `update_batch` adds each of the input values to the
/// running sum.
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
/// Returns the final aggregate value, consuming the internal state.
///
/// For example, the `SUM` accumulator maintains a running sum,
/// and `evaluate` will produce that running sum as its output.
///
/// This function should not be called twice, otherwise it will
/// result in potentially non-deterministic behavior.
///
/// This function gets `&mut self` to allow for the accumulator to build
/// arrow-compatible internal state that can be returned without copying
/// when possible (for example distinct strings)
fn evaluate(&mut self) -> Result<ScalarValue>;
/// Returns the allocated size required for this accumulator, in
/// bytes, including `Self`.
///
/// This value is used to calculate the memory used during
/// execution so DataFusion can stay within its allotted limit.
///
/// "Allocated" means that for internal containers such as `Vec`,
/// the `capacity` should be used not the `len`.
fn size(&self) -> usize;
/// Returns the intermediate state of the accumulator, consuming the
/// intermediate state.
///
/// This function should not be called twice, otherwise it will
/// result in potentially non-deterministic behavior.
///
/// This function gets `&mut self` to allow for the accumulator to build
/// arrow-compatible internal state that can be returned without copying
/// when possible (for example distinct strings).
///
/// Intermediate state is used for "multi-phase" grouping in
/// DataFusion, where an aggregate is computed in parallel with
/// multiple `Accumulator` instances, as described below:
///
/// # Multi-Phase Grouping
///
/// ```text
/// ▲
/// │ evaluate() is called to
/// │ produce the final aggregate
/// │ value per group
/// │
/// ┌─────────────────────────┐
/// │GroupBy │
/// │(AggregateMode::Final) │ state() is called for each
/// │ │ group and the resulting
/// └─────────────────────────┘ RecordBatches passed to the
/// Final GroupBy via merge_batch()
/// ▲
/// │
/// ┌────────────────┴───────────────┐
/// │ │
/// │ │
/// ┌─────────────────────────┐ ┌─────────────────────────┐
/// │ GroupBy │ │ GroupBy │
/// │(AggregateMode::Partial) │ │(AggregateMode::Partial) │
/// └─────────────────────────┘ └─────────────────────────┘
/// ▲ ▲
/// │ │ update_batch() is called for
/// │ │ each input RecordBatch
/// .─────────. .─────────.
/// ,─' '─. ,─' '─.
/// ; Input : ; Input :
/// : Partition 0 ; : Partition 1 ;
/// ╲ ╱ ╲ ╱
/// '─. ,─' '─. ,─'
/// `───────' `───────'
/// ```
///
/// The partial state is serialized as `Arrays` and then combined
/// with other partial states from different instances of this
/// Accumulator (that ran on different partitions, for example).
///
/// The state can be and often is a different type than the output
/// type of the [`Accumulator`] and needs different merge
/// operations (for example, the partial state for `COUNT` needs
/// to be summed together)
///
/// Some accumulators can return multiple values for their
/// intermediate states. For example, the average accumulator
/// tracks `sum` and `n`, and this function should return a vector
/// of two values, sum and n.
///
/// Note that [`ScalarValue::List`] can be used to pass multiple
/// values if the number of intermediate values is not known at
/// planning time (e.g. for `MEDIAN`)
///
/// # Multi-phase repartitioned Grouping
///
/// Many multi-phase grouping plans contain a Repartition operation
/// as well as shown below:
///
/// ```text
/// ▲ ▲
/// │ │
/// │ │
/// │ │
/// │ │
/// │ │
/// ┌───────────────────────┐ ┌───────────────────────┐ 4. Each AggregateMode::Final
/// │GroupBy │ │GroupBy │ GroupBy has an entry for its
/// │(AggregateMode::Final) │ │(AggregateMode::Final) │ subset of groups (in this case
/// │ │ │ │ that means half the entries)
/// └───────────────────────┘ └───────────────────────┘
/// ▲ ▲
/// │ │
/// └─────────────┬────────────┘
/// │
/// │
/// │
/// ┌─────────────────────────┐ 3. Repartitioning by hash(group
/// │ Repartition │ keys) ensures that each distinct
/// │ HASH(x) │ group key now appears in exactly
/// └─────────────────────────┘ one partition
/// ▲
/// │
/// ┌───────────────┴─────────────┐
/// │ │
/// │ │
/// ┌─────────────────────────┐ ┌──────────────────────────┐ 2. Each AggregateMode::Partial
/// │ GroupBy │ │ GroupBy │ GroupBy has an entry for *all*
/// │(AggregateMode::Partial) │ │ (AggregateMode::Partial) │ the groups
/// └─────────────────────────┘ └──────────────────────────┘
/// ▲ ▲
/// │ │
/// │ │
/// .─────────. .─────────.
/// ,─' '─. ,─' '─.
/// ; Input : ; Input : 1. Since input data is
/// : Partition 0 ; : Partition 1 ; arbitrarily or RoundRobin
/// ╲ ╱ ╲ ╱ distributed, each partition
/// '─. ,─' '─. ,─' likely has all distinct
/// `───────' `───────'
/// ```
///
/// This structure is used so that the `AggregateMode::Partial` accumulators
/// reduces the cardinality of the input as soon as possible. Typically,
/// each partial accumulator sees all groups in the input as the group keys
/// are evenly distributed across the input.
///
/// The final output is computed by repartitioning the result of
/// [`Self::state`] from each Partial aggregate and `hash(group keys)` so
/// that each distinct group key appears in exactly one of the
/// `AggregateMode::Final` GroupBy nodes. The outputs of the final nodes are
/// then unioned together to produce the overall final output.
///
/// Here is an example that shows the distribution of groups in the
/// different phases
///
/// ```text
/// ┌─────┐ ┌─────┐
/// │ 1 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 2 │ │ 4 │ After repartitioning by
/// └─────┘ └─────┘ hash(group keys), each distinct
/// ┌─────┐ ┌─────┐ group key now appears in exactly
/// │ 1 │ │ 3 │ one partition
/// ├─────┤ ├─────┤
/// │ 2 │ │ 4 │
/// └─────┘ └─────┘
///
///
/// ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─
///
/// ┌─────┐ ┌─────┐
/// │ 2 │ │ 2 │
/// ├─────┤ ├─────┤
/// │ 1 │ │ 2 │
/// ├─────┤ ├─────┤
/// │ 3 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 4 │ │ 1 │
/// └─────┘ └─────┘ Input data is arbitrarily or
/// ... ... RoundRobin distributed, each
/// ┌─────┐ ┌─────┐ partition likely has all
/// │ 1 │ │ 4 │ distinct group keys
/// ├─────┤ ├─────┤
/// │ 4 │ │ 3 │
/// ├─────┤ ├─────┤
/// │ 1 │ │ 1 │
/// ├─────┤ ├─────┤
/// │ 4 │ │ 3 │
/// └─────┘ └─────┘
///
/// group values group values
/// in partition 0 in partition 1
/// ```
fn state(&mut self) -> Result<Vec<ScalarValue>>;
/// Updates the accumulator's state from an `Array` containing one
/// or more intermediate values.
///
/// For some aggregates (such as `SUM`), merge_batch is the same
/// as `update_batch`, but for some aggregates (such as `COUNT`)
/// the operations differ. See [`Self::state`] for more details on how
/// state is used and merged.
///
/// The `states` array passed was formed by concatenating the
/// results of calling [`Self::state`] on zero or more other
/// `Accumulator` instances.
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()>;
/// Retracts (removed) an update (caused by the given inputs) to
/// accumulator's state.
///
/// This is the inverse operation of [`Self::update_batch`] and is used
/// to incrementally calculate window aggregates where the `OVER`
/// clause defines a bounded window.
///
/// # Example
///
/// For example, given the following input partition
///
/// ```text
/// │ current │
/// window
/// │ │
/// ┌────┬────┬────┬────┬────┬────┬────┬────┬────┐
/// Input │ A │ B │ C │ D │ E │ F │ G │ H │ I │
/// partition └────┴────┴────┴────┼────┴────┴────┴────┼────┘
///
/// │ next │
/// window
/// ```
///
/// First, [`Self::evaluate`] will be called to produce the output
/// for the current window.
///
/// Then, to advance to the next window:
///
/// First, [`Self::retract_batch`] will be called with the values
/// that are leaving the window, `[B, C, D]` and then
/// [`Self::update_batch`] will be called with the values that are
/// entering the window, `[F, G, H]`.
fn retract_batch(&mut self, _values: &[ArrayRef]) -> Result<()> {
// TODO add retract for all accumulators
internal_err!(
"Retract should be implemented for aggregate functions when used with custom window frame queries"
)
}
/// Does the accumulator support incrementally updating its value
/// by *removing* values.
///
/// If this function returns true, [`Self::retract_batch`] will be
/// called for sliding window functions such as queries with an
/// `OVER (ROWS BETWEEN 1 PRECEDING AND 2 FOLLOWING)`
fn supports_retract_batch(&self) -> bool {
false
}
}