blob: fbbf4d303515ee3bd448979e95ec9a59ca6abff0 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Utilities for implementing GroupsAccumulator
//! Adapter that makes [`GroupsAccumulator`] out of [`Accumulator`]
pub mod accumulate;
pub mod bool_op;
pub mod nulls;
pub mod prim_op;
use arrow::{
array::{ArrayRef, AsArray, BooleanArray, PrimitiveArray},
compute,
datatypes::UInt32Type,
};
use datafusion_common::{
arrow_datafusion_err, utils::take_arrays, DataFusionError, Result, ScalarValue,
};
use datafusion_expr_common::accumulator::Accumulator;
use datafusion_expr_common::groups_accumulator::{EmitTo, GroupsAccumulator};
/// An adapter that implements [`GroupsAccumulator`] for any [`Accumulator`]
///
/// While [`Accumulator`] are simpler to implement and can support
/// more general calculations (like retractable window functions),
/// they are not as fast as a specialized `GroupsAccumulator`. This
/// interface bridges the gap so the group by operator only operates
/// in terms of [`Accumulator`].
///
/// Internally, this adapter creates a new [`Accumulator`] for each group which
/// stores the state for that group. This both requires an allocation for each
/// Accumulator, internal indices, as well as whatever internal allocations the
/// Accumulator itself requires.
///
/// For example, a `MinAccumulator` that computes the minimum string value with
/// a [`ScalarValue::Utf8`]. That will require at least two allocations per group
/// (one for the `MinAccumulator` and one for the `ScalarValue::Utf8`).
///
/// ```text
/// ┌─────────────────────────────────┐
/// │MinAccumulator { │
/// ┌─────▶│ min: ScalarValue::Utf8("A") │───────┐
/// │ │} │ │
/// │ └─────────────────────────────────┘ └───────▶ "A"
/// ┌─────┐ │ ┌─────────────────────────────────┐
/// │ 0 │─────┘ │MinAccumulator { │
/// ├─────┤ ┌─────▶│ min: ScalarValue::Utf8("Z") │───────────────▶ "Z"
/// │ 1 │─────┘ │} │
/// └─────┘ └─────────────────────────────────┘ ...
/// ... ...
/// ┌─────┐ ┌────────────────────────────────┐
/// │ N-2 │ │MinAccumulator { │
/// ├─────┤ │ min: ScalarValue::Utf8("A") │────────────────▶ "A"
/// │ N-1 │─────┐ │} │
/// └─────┘ │ └────────────────────────────────┘
/// │ ┌────────────────────────────────┐ ┌───────▶ "Q"
/// │ │MinAccumulator { │ │
/// └─────▶│ min: ScalarValue::Utf8("Q") │────────┘
/// │} │
/// └────────────────────────────────┘
///
///
/// Logical group Current Min/Max value for that group stored
/// number as a ScalarValue which points to an
/// indivdually allocated String
///
///```
///
/// # Optimizations
///
/// The adapter minimizes the number of calls to [`Accumulator::update_batch`]
/// by first collecting the input rows for each group into a contiguous array
/// using [`compute::take`]
///
pub struct GroupsAccumulatorAdapter {
factory: Box<dyn Fn() -> Result<Box<dyn Accumulator>> + Send>,
/// state for each group, stored in group_index order
states: Vec<AccumulatorState>,
/// Current memory usage, in bytes.
///
/// Note this is incrementally updated with deltas to avoid the
/// call to size() being a bottleneck. We saw size() being a
/// bottleneck in earlier implementations when there were many
/// distinct groups.
allocation_bytes: usize,
}
struct AccumulatorState {
/// [`Accumulator`] that stores the per-group state
accumulator: Box<dyn Accumulator>,
/// scratch space: indexes in the input array that will be fed to
/// this accumulator. Stores indexes as `u32` to match the arrow
/// `take` kernel input.
indices: Vec<u32>,
}
impl AccumulatorState {
fn new(accumulator: Box<dyn Accumulator>) -> Self {
Self {
accumulator,
indices: vec![],
}
}
/// Returns the amount of memory taken by this structure and its accumulator
fn size(&self) -> usize {
self.accumulator.size()
+ std::mem::size_of_val(self)
+ self.indices.allocated_size()
}
}
impl GroupsAccumulatorAdapter {
/// Create a new adapter that will create a new [`Accumulator`]
/// for each group, using the specified factory function
pub fn new<F>(factory: F) -> Self
where
F: Fn() -> Result<Box<dyn Accumulator>> + Send + 'static,
{
Self {
factory: Box::new(factory),
states: vec![],
allocation_bytes: 0,
}
}
/// Ensure that self.accumulators has total_num_groups
fn make_accumulators_if_needed(&mut self, total_num_groups: usize) -> Result<()> {
// can't shrink
assert!(total_num_groups >= self.states.len());
let vec_size_pre = self.states.allocated_size();
// instantiate new accumulators
let new_accumulators = total_num_groups - self.states.len();
for _ in 0..new_accumulators {
let accumulator = (self.factory)()?;
let state = AccumulatorState::new(accumulator);
self.add_allocation(state.size());
self.states.push(state);
}
self.adjust_allocation(vec_size_pre, self.states.allocated_size());
Ok(())
}
/// invokes f(accumulator, values) for each group that has values
/// in group_indices.
///
/// This function first reorders the input and filter so that
/// values for each group_index are contiguous and then invokes f
/// on the contiguous ranges, to minimize per-row overhead
///
/// ```text
/// ┌─────────┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐ ┌─────────┐ ┌ ─ ─ ─ ─ ┐
/// │ ┌─────┐ │ │ ┌─────┐ │ ┌─────┐ ┏━━━━━┓ │ ┌─────┐ │ ┌─────┐
/// │ │ 2 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 0 ┃ │ │ 200 │ │ │ │ t │ │
/// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
/// │ │ 2 │ │ │ │ 100 │ │ │ │ f │ │ ┃ 0 ┃ │ │ 300 │ │ │ │ t │ │
/// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
/// │ │ 0 │ │ │ │ 200 │ │ │ │ t │ │ ┃ 1 ┃ │ │ 200 │ │ │ │NULL │ │
/// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ────────▶ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
/// │ │ 1 │ │ │ │ 200 │ │ │ │NULL │ │ ┃ 2 ┃ │ │ 200 │ │ │ │ t │ │
/// │ ├─────┤ │ │ ├─────┤ │ ├─────┤ ┣━━━━━┫ │ ├─────┤ │ ├─────┤
/// │ │ 0 │ │ │ │ 300 │ │ │ │ t │ │ ┃ 2 ┃ │ │ 100 │ │ │ │ f │ │
/// │ └─────┘ │ │ └─────┘ │ └─────┘ ┗━━━━━┛ │ └─────┘ │ └─────┘
/// └─────────┘ └─────────┘ └ ─ ─ ─ ─ ┘ └─────────┘ └ ─ ─ ─ ─ ┘
///
/// logical group values opt_filter logical group values opt_filter
///
/// ```
fn invoke_per_accumulator<F>(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
f: F,
) -> Result<()>
where
F: Fn(&mut dyn Accumulator, &[ArrayRef]) -> Result<()>,
{
self.make_accumulators_if_needed(total_num_groups)?;
assert_eq!(values[0].len(), group_indices.len());
// figure out which input rows correspond to which groups.
// Note that self.state.indices starts empty for all groups
// (it is cleared out below)
for (idx, group_index) in group_indices.iter().enumerate() {
self.states[*group_index].indices.push(idx as u32);
}
// groups_with_rows holds a list of group indexes that have
// any rows that need to be accumulated, stored in order of
// group_index
let mut groups_with_rows = vec![];
// batch_indices holds indices into values, each group is contiguous
let mut batch_indices = vec![];
// offsets[i] is index into batch_indices where the rows for
// group_index i starts
let mut offsets = vec![0];
let mut offset_so_far = 0;
for (group_index, state) in self.states.iter_mut().enumerate() {
let indices = &state.indices;
if indices.is_empty() {
continue;
}
groups_with_rows.push(group_index);
batch_indices.extend_from_slice(indices);
offset_so_far += indices.len();
offsets.push(offset_so_far);
}
let batch_indices = batch_indices.into();
// reorder the values and opt_filter by batch_indices so that
// all values for each group are contiguous, then invoke the
// accumulator once per group with values
let values = take_arrays(values, &batch_indices)?;
let opt_filter = get_filter_at_indices(opt_filter, &batch_indices)?;
// invoke each accumulator with the appropriate rows, first
// pulling the input arguments for this group into their own
// RecordBatch(es)
let iter = groups_with_rows.iter().zip(offsets.windows(2));
let mut sizes_pre = 0;
let mut sizes_post = 0;
for (&group_idx, offsets) in iter {
let state = &mut self.states[group_idx];
sizes_pre += state.size();
let values_to_accumulate = slice_and_maybe_filter(
&values,
opt_filter.as_ref().map(|f| f.as_boolean()),
offsets,
)?;
f(state.accumulator.as_mut(), &values_to_accumulate)?;
// clear out the state so they are empty for next
// iteration
state.indices.clear();
sizes_post += state.size();
}
self.adjust_allocation(sizes_pre, sizes_post);
Ok(())
}
/// Increment the allocation by `n`
///
/// See [`Self::allocation_bytes`] for rationale.
fn add_allocation(&mut self, size: usize) {
self.allocation_bytes += size;
}
/// Decrease the allocation by `n`
///
/// See [`Self::allocation_bytes`] for rationale.
fn free_allocation(&mut self, size: usize) {
// use saturating sub to avoid errors if the accumulators
// report erronious sizes
self.allocation_bytes = self.allocation_bytes.saturating_sub(size)
}
/// Adjusts the allocation for something that started with
/// start_size and now has new_size avoiding overflow
///
/// See [`Self::allocation_bytes`] for rationale.
fn adjust_allocation(&mut self, old_size: usize, new_size: usize) {
if new_size > old_size {
self.add_allocation(new_size - old_size)
} else {
self.free_allocation(old_size - new_size)
}
}
}
impl GroupsAccumulator for GroupsAccumulatorAdapter {
fn update_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.invoke_per_accumulator(
values,
group_indices,
opt_filter,
total_num_groups,
|accumulator, values_to_accumulate| {
accumulator.update_batch(values_to_accumulate)
},
)?;
Ok(())
}
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
let vec_size_pre = self.states.allocated_size();
let states = emit_to.take_needed(&mut self.states);
let results: Vec<ScalarValue> = states
.into_iter()
.map(|mut state| {
self.free_allocation(state.size());
state.accumulator.evaluate()
})
.collect::<Result<_>>()?;
let result = ScalarValue::iter_to_array(results);
self.adjust_allocation(vec_size_pre, self.states.allocated_size());
result
}
// filtered_null_mask(opt_filter, &values);
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let vec_size_pre = self.states.allocated_size();
let states = emit_to.take_needed(&mut self.states);
// each accumulator produces a potential vector of values
// which we need to form into columns
let mut results: Vec<Vec<ScalarValue>> = vec![];
for mut state in states {
self.free_allocation(state.size());
let accumulator_state = state.accumulator.state()?;
results.resize_with(accumulator_state.len(), Vec::new);
for (idx, state_val) in accumulator_state.into_iter().enumerate() {
results[idx].push(state_val);
}
}
// create an array for each intermediate column
let arrays = results
.into_iter()
.map(ScalarValue::iter_to_array)
.collect::<Result<Vec<_>>>()?;
// double check each array has the same length (aka the
// accumulator was implemented correctly
if let Some(first_col) = arrays.first() {
for arr in &arrays {
assert_eq!(arr.len(), first_col.len())
}
}
self.adjust_allocation(vec_size_pre, self.states.allocated_size());
Ok(arrays)
}
fn merge_batch(
&mut self,
values: &[ArrayRef],
group_indices: &[usize],
opt_filter: Option<&BooleanArray>,
total_num_groups: usize,
) -> Result<()> {
self.invoke_per_accumulator(
values,
group_indices,
opt_filter,
total_num_groups,
|accumulator, values_to_accumulate| {
accumulator.merge_batch(values_to_accumulate)?;
Ok(())
},
)?;
Ok(())
}
fn size(&self) -> usize {
self.allocation_bytes
}
fn convert_to_state(
&self,
values: &[ArrayRef],
opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
let num_rows = values[0].len();
// Each row has its respective group
let mut results = vec![];
for row_idx in 0..num_rows {
// Create the empty accumulator for converting
let mut converted_accumulator = (self.factory)()?;
// Convert row to states
let values_to_accumulate =
slice_and_maybe_filter(values, opt_filter, &[row_idx, row_idx + 1])?;
converted_accumulator.update_batch(&values_to_accumulate)?;
let states = converted_accumulator.state()?;
// Resize results to have enough columns according to the converted states
results.resize_with(states.len(), || Vec::with_capacity(num_rows));
// Add the states to results
for (idx, state_val) in states.into_iter().enumerate() {
results[idx].push(state_val);
}
}
let arrays = results
.into_iter()
.map(ScalarValue::iter_to_array)
.collect::<Result<Vec<_>>>()?;
Ok(arrays)
}
fn supports_convert_to_state(&self) -> bool {
true
}
}
/// Extension trait for [`Vec`] to account for allocations.
pub trait VecAllocExt {
/// Item type.
type T;
/// Return the amount of memory allocated by this Vec (not
/// recursively counting any heap allocations contained within the
/// structure). Does not include the size of `self`
fn allocated_size(&self) -> usize;
}
impl<T> VecAllocExt for Vec<T> {
type T = T;
fn allocated_size(&self) -> usize {
std::mem::size_of::<T>() * self.capacity()
}
}
fn get_filter_at_indices(
opt_filter: Option<&BooleanArray>,
indices: &PrimitiveArray<UInt32Type>,
) -> Result<Option<ArrayRef>> {
opt_filter
.map(|filter| {
compute::take(
&filter, indices, None, // None: no index check
)
})
.transpose()
.map_err(|e| arrow_datafusion_err!(e))
}
// Copied from physical-plan
pub(crate) fn slice_and_maybe_filter(
aggr_array: &[ArrayRef],
filter_opt: Option<&BooleanArray>,
offsets: &[usize],
) -> Result<Vec<ArrayRef>> {
let (offset, length) = (offsets[0], offsets[1] - offsets[0]);
let sliced_arrays: Vec<ArrayRef> = aggr_array
.iter()
.map(|array| array.slice(offset, length))
.collect();
if let Some(f) = filter_opt {
let filter = f.slice(offset, length);
sliced_arrays
.iter()
.map(|array| {
compute::filter(&array, &filter).map_err(|e| arrow_datafusion_err!(e))
})
.collect()
} else {
Ok(sliced_arrays)
}
}