blob: 5a7598ea1f299da9cea4ed483fa3c6d9b142965b [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::array::{Array, NullArray, UInt64Array};
use arrow::array::{ArrayRef, BooleanArray};
use arrow::datatypes::{FieldRef, Schema, SchemaRef};
use std::collections::HashSet;
use std::sync::Arc;
use crate::error::DataFusionError;
use crate::stats::Precision;
use crate::{Column, Statistics};
use crate::{ColumnStatistics, ScalarValue};
/// A source of runtime statistical information to [`PruningPredicate`]s.
///
/// # Supported Information
///
/// 1. Minimum and maximum values for columns
///
/// 2. Null counts and row counts for columns
///
/// 3. Whether the values in a column are contained in a set of literals
///
/// # Vectorized Interface
///
/// Information for containers / files are returned as Arrow [`ArrayRef`], so
/// the evaluation happens once on a single `RecordBatch`, which amortizes the
/// overhead of evaluating the predicate. This is important when pruning 1000s
/// of containers which often happens in analytic systems that have 1000s of
/// potential files to consider.
///
/// For example, for the following three files with a single column `a`:
/// ```text
/// file1: column a: min=5, max=10
/// file2: column a: No stats
/// file2: column a: min=20, max=30
/// ```
///
/// PruningStatistics would return:
///
/// ```text
/// min_values("a") -> Some([5, Null, 20])
/// max_values("a") -> Some([10, Null, 30])
/// min_values("X") -> None
/// ```
///
/// [`PruningPredicate`]: https://docs.rs/datafusion/latest/datafusion/physical_optimizer/pruning/struct.PruningPredicate.html
pub trait PruningStatistics {
/// Return the minimum values for the named column, if known.
///
/// If the minimum value for a particular container is not known, the
/// returned array should have `null` in that row. If the minimum value is
/// not known for any row, return `None`.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn min_values(&self, column: &Column) -> Option<ArrayRef>;
/// Return the maximum values for the named column, if known.
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn max_values(&self, column: &Column) -> Option<ArrayRef>;
/// Return the number of containers (e.g. Row Groups) being pruned with
/// these statistics.
///
/// This value corresponds to the size of the [`ArrayRef`] returned by
/// [`Self::min_values`], [`Self::max_values`], [`Self::null_counts`],
/// and [`Self::row_counts`].
fn num_containers(&self) -> usize;
/// Return the number of null values for the named column as an
/// [`UInt64Array`]
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
///
/// [`UInt64Array`]: arrow::array::UInt64Array
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;
/// Return the number of rows for the named column in each container
/// as an [`UInt64Array`].
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
///
/// [`UInt64Array`]: arrow::array::UInt64Array
fn row_counts(&self, column: &Column) -> Option<ArrayRef>;
/// Returns [`BooleanArray`] where each row represents information known
/// about specific literal `values` in a column.
///
/// For example, Parquet Bloom Filters implement this API to communicate
/// that `values` are known not to be present in a Row Group.
///
/// The returned array has one row for each container, with the following
/// meanings:
/// * `true` if the values in `column` ONLY contain values from `values`
/// * `false` if the values in `column` are NOT ANY of `values`
/// * `null` if the neither of the above holds or is unknown.
///
/// If these statistics can not determine column membership for any
/// container, return `None` (the default).
///
/// Note: the returned array must contain [`Self::num_containers`] rows
fn contained(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray>;
}
/// Prune files based on their partition values.
///
/// This is used both at planning time and execution time to prune
/// files based on their partition values.
/// This feeds into [`CompositePruningStatistics`] to allow pruning
/// with filters that depend both on partition columns and data columns
/// (e.g. `WHERE partition_col = data_col`).
#[deprecated(
since = "52.0.0",
note = "This struct is no longer used internally. Use `replace_columns_with_literals` from `datafusion-physical-expr-adapter` to substitute partition column values before pruning. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
)]
#[derive(Clone)]
pub struct PartitionPruningStatistics {
/// Values for each column for each container.
///
/// The outer vectors represent the columns while the inner vectors
/// represent the containers. The order must match the order of the
/// partition columns in [`PartitionPruningStatistics::partition_schema`].
partition_values: Vec<ArrayRef>,
/// The number of containers.
///
/// Stored since the partition values are column-major and if
/// there are no columns we wouldn't know the number of containers.
num_containers: usize,
/// The schema of the partition columns.
///
/// This must **not** be the schema of the entire file or table: it must
/// only be the schema of the partition columns, in the same order as the
/// values in [`PartitionPruningStatistics::partition_values`].
partition_schema: SchemaRef,
}
#[expect(deprecated)]
impl PartitionPruningStatistics {
/// Create a new instance of [`PartitionPruningStatistics`].
///
/// Args:
/// * `partition_values`: A vector of vectors of [`ScalarValue`]s.
/// The outer vector represents the containers while the inner
/// vector represents the partition values for each column.
/// Note that this is the **opposite** of the order of the
/// partition columns in `PartitionPruningStatistics::partition_schema`.
/// * `partition_schema`: The schema of the partition columns.
/// This must **not** be the schema of the entire file or table:
/// instead it must only be the schema of the partition columns,
/// in the same order as the values in `partition_values`.
///
/// # Example
///
/// To create [`PartitionPruningStatistics`] for two partition columns `a` and `b`,
/// for three containers like this:
///
/// | a | b |
/// | - | - |
/// | 1 | 2 |
/// | 3 | 4 |
/// | 5 | 6 |
///
/// ```
/// # use std::sync::Arc;
/// # use datafusion_common::ScalarValue;
/// # use arrow::datatypes::{DataType, Field};
/// # use datafusion_common::pruning::PartitionPruningStatistics;
///
/// let partition_values = vec![
/// vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
/// vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
/// vec![ScalarValue::from(5i32), ScalarValue::from(6i32)],
/// ];
/// let partition_fields = vec![
/// Arc::new(Field::new("a", DataType::Int32, false)),
/// Arc::new(Field::new("b", DataType::Int32, false)),
/// ];
/// let partition_stats =
/// PartitionPruningStatistics::try_new(partition_values, partition_fields).unwrap();
/// ```
pub fn try_new(
partition_values: Vec<Vec<ScalarValue>>,
partition_fields: Vec<FieldRef>,
) -> Result<Self, DataFusionError> {
let num_containers = partition_values.len();
let partition_schema = Arc::new(Schema::new(partition_fields));
let mut partition_values_by_column =
vec![
Vec::with_capacity(partition_values.len());
partition_schema.fields().len()
];
for partition_value in partition_values {
for (i, value) in partition_value.into_iter().enumerate() {
partition_values_by_column[i].push(value);
}
}
Ok(Self {
partition_values: partition_values_by_column
.into_iter()
.map(|v| {
if v.is_empty() {
Ok(Arc::new(NullArray::new(0)) as ArrayRef)
} else {
ScalarValue::iter_to_array(v)
}
})
.collect::<Result<Vec<_>, _>>()?,
num_containers,
partition_schema,
})
}
}
#[expect(deprecated)]
impl PruningStatistics for PartitionPruningStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let index = self.partition_schema.index_of(column.name()).ok()?;
self.partition_values.get(index).and_then(|v| {
if v.is_empty() || v.null_count() == v.len() {
// If the array is empty or all nulls, return None
None
} else {
// Otherwise, return the array as is
Some(Arc::clone(v))
}
})
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
self.min_values(column)
}
fn num_containers(&self) -> usize {
self.num_containers
}
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
None
}
fn contained(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
let index = self.partition_schema.index_of(column.name()).ok()?;
let array = self.partition_values.get(index)?;
let boolean_array = values.iter().try_fold(None, |acc, v| {
let arrow_value = v.to_scalar().ok()?;
let eq_result = arrow::compute::kernels::cmp::eq(array, &arrow_value).ok()?;
match acc {
None => Some(Some(eq_result)),
Some(acc_array) => {
arrow::compute::kernels::boolean::or_kleene(&acc_array, &eq_result)
.map(Some)
.ok()
}
}
})??;
// If the boolean array is empty or all null values, return None
if boolean_array.is_empty() || boolean_array.null_count() == boolean_array.len() {
None
} else {
Some(boolean_array)
}
}
}
/// Prune a set of containers represented by their statistics.
///
/// Each [`Statistics`] represents a "container" -- some collection of data
/// that has statistics of its columns.
///
/// It is up to the caller to decide what each container represents. For
/// example, they can come from a file (e.g. [`PartitionedFile`]) or a set of of
/// files (e.g. [`FileGroup`])
///
/// [`PartitionedFile`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.PartitionedFile.html
/// [`FileGroup`]: https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.FileGroup.html
#[derive(Clone)]
pub struct PrunableStatistics {
/// Statistics for each container.
/// These are taken as a reference since they may be rather large / expensive to clone
/// and we often won't return all of them as ArrayRefs (we only return the columns the predicate requests).
statistics: Vec<Arc<Statistics>>,
/// The schema of the file these statistics are for.
schema: SchemaRef,
}
impl PrunableStatistics {
/// Create a new instance of [`PrunableStatistics`].
/// Each [`Statistics`] represents a container (e.g. a file or a partition of files).
/// The `schema` is the schema of the data in the containers and should apply to all files.
pub fn new(statistics: Vec<Arc<Statistics>>, schema: SchemaRef) -> Self {
Self { statistics, schema }
}
fn get_exact_column_statistics(
&self,
column: &Column,
get_stat: impl Fn(&ColumnStatistics) -> &Precision<ScalarValue>,
) -> Option<ArrayRef> {
let index = self.schema.index_of(column.name()).ok()?;
let mut has_value = false;
match ScalarValue::iter_to_array(self.statistics.iter().map(|s| {
s.column_statistics
.get(index)
.and_then(|stat| {
if let Precision::Exact(min) = get_stat(stat) {
has_value = true;
Some(min.clone())
} else {
None
}
})
.unwrap_or(ScalarValue::Null)
})) {
// If there is any non-null value and no errors, return the array
Ok(array) => has_value.then_some(array),
Err(_) => {
log::warn!(
"Failed to convert min values to array for column {}",
column.name()
);
None
}
}
}
}
impl PruningStatistics for PrunableStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
self.get_exact_column_statistics(column, |stat| &stat.min_value)
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
self.get_exact_column_statistics(column, |stat| &stat.max_value)
}
fn num_containers(&self) -> usize {
self.statistics.len()
}
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let index = self.schema.index_of(column.name()).ok()?;
if self.statistics.iter().any(|s| {
s.column_statistics
.get(index)
.is_some_and(|stat| stat.null_count.is_exact().unwrap_or(false))
}) {
Some(Arc::new(
self.statistics
.iter()
.map(|s| {
s.column_statistics.get(index).and_then(|stat| {
if let Precision::Exact(null_count) = &stat.null_count {
u64::try_from(*null_count).ok()
} else {
None
}
})
})
.collect::<UInt64Array>(),
))
} else {
None
}
}
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
// If the column does not exist in the schema, return None
if self.schema.index_of(column.name()).is_err() {
return None;
}
if self
.statistics
.iter()
.any(|s| s.num_rows.is_exact().unwrap_or(false))
{
Some(Arc::new(
self.statistics
.iter()
.map(|s| {
if let Precision::Exact(row_count) = &s.num_rows {
u64::try_from(*row_count).ok()
} else {
None
}
})
.collect::<UInt64Array>(),
))
} else {
None
}
}
fn contained(
&self,
_column: &Column,
_values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
None
}
}
/// Combine multiple [`PruningStatistics`] into a single
/// [`CompositePruningStatistics`].
/// This can be used to combine statistics from different sources,
/// for example partition values and file statistics.
/// This allows pruning with filters that depend on multiple sources of statistics,
/// such as `WHERE partition_col = data_col`.
/// This is done by iterating over the statistics and returning the first
/// one that has information for the requested column.
/// If multiple statistics have information for the same column,
/// the first one is returned without any regard for completeness or accuracy.
/// That is: if the first statistics has information for a column, even if it is incomplete,
/// that is returned even if a later statistics has more complete information.
#[deprecated(
since = "52.0.0",
note = "This struct is no longer used internally. It may be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first. Please open an issue if you have a use case for it."
)]
pub struct CompositePruningStatistics {
pub statistics: Vec<Box<dyn PruningStatistics>>,
}
#[expect(deprecated)]
impl CompositePruningStatistics {
/// Create a new instance of [`CompositePruningStatistics`] from
/// a vector of [`PruningStatistics`].
pub fn new(statistics: Vec<Box<dyn PruningStatistics>>) -> Self {
assert!(!statistics.is_empty());
// Check that all statistics have the same number of containers
let num_containers = statistics[0].num_containers();
for stats in &statistics {
assert_eq!(num_containers, stats.num_containers());
}
Self { statistics }
}
}
#[expect(deprecated)]
impl PruningStatistics for CompositePruningStatistics {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
for stats in &self.statistics {
if let Some(array) = stats.min_values(column) {
return Some(array);
}
}
None
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
for stats in &self.statistics {
if let Some(array) = stats.max_values(column) {
return Some(array);
}
}
None
}
fn num_containers(&self) -> usize {
self.statistics[0].num_containers()
}
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
for stats in &self.statistics {
if let Some(array) = stats.null_counts(column) {
return Some(array);
}
}
None
}
fn row_counts(&self, column: &Column) -> Option<ArrayRef> {
for stats in &self.statistics {
if let Some(array) = stats.row_counts(column) {
return Some(array);
}
}
None
}
fn contained(
&self,
column: &Column,
values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
for stats in &self.statistics {
if let Some(array) = stats.contained(column, values) {
return Some(array);
}
}
None
}
}
#[cfg(test)]
#[expect(deprecated)]
mod tests {
use crate::{
ColumnStatistics,
cast::{as_int32_array, as_uint64_array},
};
use super::*;
use arrow::datatypes::{DataType, Field};
use std::sync::Arc;
/// return a PartitionPruningStatistics for two columns 'a' and 'b'
/// and the following stats
///
/// | a | b |
/// | - | - |
/// | 1 | 2 |
/// | 3 | 4 |
fn partition_pruning_statistics_setup() -> PartitionPruningStatistics {
let partition_values = vec![
vec![ScalarValue::from(1i32), ScalarValue::from(2i32)],
vec![ScalarValue::from(3i32), ScalarValue::from(4i32)],
];
let partition_fields = vec![
Arc::new(Field::new("a", DataType::Int32, false)),
Arc::new(Field::new("b", DataType::Int32, false)),
];
PartitionPruningStatistics::try_new(partition_values, partition_fields).unwrap()
}
#[test]
fn test_partition_pruning_statistics() {
let partition_stats = partition_pruning_statistics_setup();
let column_a = Column::new_unqualified("a");
let column_b = Column::new_unqualified("b");
// Partition values don't know anything about nulls or row counts
assert!(partition_stats.null_counts(&column_a).is_none());
assert!(partition_stats.row_counts(&column_a).is_none());
assert!(partition_stats.null_counts(&column_b).is_none());
assert!(partition_stats.row_counts(&column_b).is_none());
// Min/max values are the same as the partition values
let min_values_a =
as_int32_array(&partition_stats.min_values(&column_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_a = vec![Some(1), Some(3)];
assert_eq!(min_values_a, expected_values_a);
let max_values_a =
as_int32_array(&partition_stats.max_values(&column_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_a = vec![Some(1), Some(3)];
assert_eq!(max_values_a, expected_values_a);
let min_values_b =
as_int32_array(&partition_stats.min_values(&column_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_b = vec![Some(2), Some(4)];
assert_eq!(min_values_b, expected_values_b);
let max_values_b =
as_int32_array(&partition_stats.max_values(&column_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_b = vec![Some(2), Some(4)];
assert_eq!(max_values_b, expected_values_b);
// Contained values are only true for the partition values
let values = HashSet::from([ScalarValue::from(1i32)]);
let contained_a = partition_stats.contained(&column_a, &values).unwrap();
let expected_contained_a = BooleanArray::from(vec![true, false]);
assert_eq!(contained_a, expected_contained_a);
let contained_b = partition_stats.contained(&column_b, &values).unwrap();
let expected_contained_b = BooleanArray::from(vec![false, false]);
assert_eq!(contained_b, expected_contained_b);
// The number of containers is the length of the partition values
assert_eq!(partition_stats.num_containers(), 2);
}
#[test]
fn test_partition_pruning_statistics_multiple_positive_values() {
let partition_stats = partition_pruning_statistics_setup();
let column_a = Column::new_unqualified("a");
// The two containers have `a` values 1 and 3, so they both only contain values from 1 and 3
let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(3i32)]);
let contained_a = partition_stats.contained(&column_a, &values).unwrap();
let expected_contained_a = BooleanArray::from(vec![true, true]);
assert_eq!(contained_a, expected_contained_a);
}
#[test]
fn test_partition_pruning_statistics_multiple_negative_values() {
let partition_stats = partition_pruning_statistics_setup();
let column_a = Column::new_unqualified("a");
// The two containers have `a` values 1 and 3,
// so the first contains ONLY values from 1,2
// but the second does not
let values = HashSet::from([ScalarValue::from(1i32), ScalarValue::from(2i32)]);
let contained_a = partition_stats.contained(&column_a, &values).unwrap();
let expected_contained_a = BooleanArray::from(vec![true, false]);
assert_eq!(contained_a, expected_contained_a);
}
#[test]
fn test_partition_pruning_statistics_null_in_values() {
let partition_values = vec![
vec![
ScalarValue::from(1i32),
ScalarValue::from(2i32),
ScalarValue::from(3i32),
],
vec![
ScalarValue::from(4i32),
ScalarValue::from(5i32),
ScalarValue::from(6i32),
],
];
let partition_fields = vec![
Arc::new(Field::new("a", DataType::Int32, false)),
Arc::new(Field::new("b", DataType::Int32, false)),
Arc::new(Field::new("c", DataType::Int32, false)),
];
let partition_stats =
PartitionPruningStatistics::try_new(partition_values, partition_fields)
.unwrap();
let column_a = Column::new_unqualified("a");
let column_b = Column::new_unqualified("b");
let column_c = Column::new_unqualified("c");
let values_a = HashSet::from([ScalarValue::from(1i32), ScalarValue::Int32(None)]);
let contained_a = partition_stats.contained(&column_a, &values_a).unwrap();
let mut builder = BooleanArray::builder(2);
builder.append_value(true);
builder.append_null();
let expected_contained_a = builder.finish();
assert_eq!(contained_a, expected_contained_a);
// First match creates a NULL boolean array
// The accumulator should update the value to true for the second value
let values_b = HashSet::from([ScalarValue::Int32(None), ScalarValue::from(5i32)]);
let contained_b = partition_stats.contained(&column_b, &values_b).unwrap();
let mut builder = BooleanArray::builder(2);
builder.append_null();
builder.append_value(true);
let expected_contained_b = builder.finish();
assert_eq!(contained_b, expected_contained_b);
// All matches are null, contained should return None
let values_c = HashSet::from([ScalarValue::Int32(None)]);
let contained_c = partition_stats.contained(&column_c, &values_c);
assert!(contained_c.is_none());
}
#[test]
fn test_partition_pruning_statistics_empty() {
let partition_values = vec![];
let partition_fields = vec![
Arc::new(Field::new("a", DataType::Int32, false)),
Arc::new(Field::new("b", DataType::Int32, false)),
];
let partition_stats =
PartitionPruningStatistics::try_new(partition_values, partition_fields)
.unwrap();
let column_a = Column::new_unqualified("a");
let column_b = Column::new_unqualified("b");
// Partition values don't know anything about nulls or row counts
assert!(partition_stats.null_counts(&column_a).is_none());
assert!(partition_stats.row_counts(&column_a).is_none());
assert!(partition_stats.null_counts(&column_b).is_none());
assert!(partition_stats.row_counts(&column_b).is_none());
// Min/max values are all missing
assert!(partition_stats.min_values(&column_a).is_none());
assert!(partition_stats.max_values(&column_a).is_none());
assert!(partition_stats.min_values(&column_b).is_none());
assert!(partition_stats.max_values(&column_b).is_none());
// Contained values are all empty
let values = HashSet::from([ScalarValue::from(1i32)]);
assert!(partition_stats.contained(&column_a, &values).is_none());
}
#[test]
fn test_statistics_pruning_statistics() {
let statistics = vec![
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(0i32)))
.with_max_value(Precision::Exact(ScalarValue::from(100i32)))
.with_null_count(Precision::Exact(0)),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(100i32)))
.with_max_value(Precision::Exact(ScalarValue::from(200i32)))
.with_null_count(Precision::Exact(5)),
)
.with_num_rows(Precision::Exact(100)),
),
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(50i32)))
.with_max_value(Precision::Exact(ScalarValue::from(300i32)))
.with_null_count(Precision::Exact(10)),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(200i32)))
.with_max_value(Precision::Exact(ScalarValue::from(400i32)))
.with_null_count(Precision::Exact(0)),
)
.with_num_rows(Precision::Exact(200)),
),
];
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));
let pruning_stats = PrunableStatistics::new(statistics, schema);
let column_a = Column::new_unqualified("a");
let column_b = Column::new_unqualified("b");
// Min/max values are the same as the statistics
let min_values_a = as_int32_array(&pruning_stats.min_values(&column_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_a = vec![Some(0), Some(50)];
assert_eq!(min_values_a, expected_values_a);
let max_values_a = as_int32_array(&pruning_stats.max_values(&column_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_a = vec![Some(100), Some(300)];
assert_eq!(max_values_a, expected_values_a);
let min_values_b = as_int32_array(&pruning_stats.min_values(&column_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_b = vec![Some(100), Some(200)];
assert_eq!(min_values_b, expected_values_b);
let max_values_b = as_int32_array(&pruning_stats.max_values(&column_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_b = vec![Some(200), Some(400)];
assert_eq!(max_values_b, expected_values_b);
// Null counts are the same as the statistics
let null_counts_a =
as_uint64_array(&pruning_stats.null_counts(&column_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_null_counts_a = vec![Some(0), Some(10)];
assert_eq!(null_counts_a, expected_null_counts_a);
let null_counts_b =
as_uint64_array(&pruning_stats.null_counts(&column_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_null_counts_b = vec![Some(5), Some(0)];
assert_eq!(null_counts_b, expected_null_counts_b);
// Row counts are the same as the statistics
let row_counts_a = as_uint64_array(&pruning_stats.row_counts(&column_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts_a = vec![Some(100), Some(200)];
assert_eq!(row_counts_a, expected_row_counts_a);
let row_counts_b = as_uint64_array(&pruning_stats.row_counts(&column_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts_b = vec![Some(100), Some(200)];
assert_eq!(row_counts_b, expected_row_counts_b);
// Contained values are all null/missing (we can't know this just from statistics)
let values = HashSet::from([ScalarValue::from(0i32)]);
assert!(pruning_stats.contained(&column_a, &values).is_none());
assert!(pruning_stats.contained(&column_b, &values).is_none());
// The number of containers is the length of the statistics
assert_eq!(pruning_stats.num_containers(), 2);
// Test with a column that has no statistics
let column_c = Column::new_unqualified("c");
assert!(pruning_stats.min_values(&column_c).is_none());
assert!(pruning_stats.max_values(&column_c).is_none());
assert!(pruning_stats.null_counts(&column_c).is_none());
// Since row counts uses the first column that has row counts we get them back even
// if this columns does not have them set.
// This is debatable, personally I think `row_count` should not take a `Column` as an argument
// at all since all columns should have the same number of rows.
// But for now we just document the current behavior in this test.
let row_counts_c = as_uint64_array(&pruning_stats.row_counts(&column_c).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts_c = vec![Some(100), Some(200)];
assert_eq!(row_counts_c, expected_row_counts_c);
assert!(pruning_stats.contained(&column_c, &values).is_none());
// Test with a column that doesn't exist
let column_d = Column::new_unqualified("d");
assert!(pruning_stats.min_values(&column_d).is_none());
assert!(pruning_stats.max_values(&column_d).is_none());
assert!(pruning_stats.null_counts(&column_d).is_none());
assert!(pruning_stats.row_counts(&column_d).is_none());
assert!(pruning_stats.contained(&column_d, &values).is_none());
}
#[test]
fn test_statistics_pruning_statistics_empty() {
let statistics = vec![];
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Int32, false),
Field::new("c", DataType::Int32, false),
]));
let pruning_stats = PrunableStatistics::new(statistics, schema);
let column_a = Column::new_unqualified("a");
let column_b = Column::new_unqualified("b");
// Min/max values are all missing
assert!(pruning_stats.min_values(&column_a).is_none());
assert!(pruning_stats.max_values(&column_a).is_none());
assert!(pruning_stats.min_values(&column_b).is_none());
assert!(pruning_stats.max_values(&column_b).is_none());
// Null counts are all missing
assert!(pruning_stats.null_counts(&column_a).is_none());
assert!(pruning_stats.null_counts(&column_b).is_none());
// Row counts are all missing
assert!(pruning_stats.row_counts(&column_a).is_none());
assert!(pruning_stats.row_counts(&column_b).is_none());
// Contained values are all empty
let values = HashSet::from([ScalarValue::from(1i32)]);
assert!(pruning_stats.contained(&column_a, &values).is_none());
}
#[test]
fn test_composite_pruning_statistics_partition_and_file() {
// Create partition statistics
let partition_values = vec![
vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
];
let partition_fields = vec![
Arc::new(Field::new("part_a", DataType::Int32, false)),
Arc::new(Field::new("part_b", DataType::Int32, false)),
];
let partition_stats =
PartitionPruningStatistics::try_new(partition_values, partition_fields)
.unwrap();
// Create file statistics
let file_statistics = vec![
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(100i32)))
.with_max_value(Precision::Exact(ScalarValue::from(200i32)))
.with_null_count(Precision::Exact(0)),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(300i32)))
.with_max_value(Precision::Exact(ScalarValue::from(400i32)))
.with_null_count(Precision::Exact(5)),
)
.with_num_rows(Precision::Exact(100)),
),
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(500i32)))
.with_max_value(Precision::Exact(ScalarValue::from(600i32)))
.with_null_count(Precision::Exact(10)),
)
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(700i32)))
.with_max_value(Precision::Exact(ScalarValue::from(800i32)))
.with_null_count(Precision::Exact(0)),
)
.with_num_rows(Precision::Exact(200)),
),
];
let file_schema = Arc::new(Schema::new(vec![
Field::new("col_x", DataType::Int32, false),
Field::new("col_y", DataType::Int32, false),
]));
let file_stats = PrunableStatistics::new(file_statistics, file_schema);
// Create composite statistics
let composite_stats = CompositePruningStatistics::new(vec![
Box::new(partition_stats),
Box::new(file_stats),
]);
// Test accessing columns that are only in partition statistics
let part_a = Column::new_unqualified("part_a");
let part_b = Column::new_unqualified("part_b");
// Test accessing columns that are only in file statistics
let col_x = Column::new_unqualified("col_x");
let col_y = Column::new_unqualified("col_y");
// For partition columns, should get values from partition statistics
let min_values_part_a =
as_int32_array(&composite_stats.min_values(&part_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_part_a = vec![Some(1), Some(2)];
assert_eq!(min_values_part_a, expected_values_part_a);
let max_values_part_a =
as_int32_array(&composite_stats.max_values(&part_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
// For partition values, min and max are the same
assert_eq!(max_values_part_a, expected_values_part_a);
let min_values_part_b =
as_int32_array(&composite_stats.min_values(&part_b).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_part_b = vec![Some(10), Some(20)];
assert_eq!(min_values_part_b, expected_values_part_b);
// For file columns, should get values from file statistics
let min_values_col_x =
as_int32_array(&composite_stats.min_values(&col_x).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_col_x = vec![Some(100), Some(500)];
assert_eq!(min_values_col_x, expected_values_col_x);
let max_values_col_x =
as_int32_array(&composite_stats.max_values(&col_x).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_max_values_col_x = vec![Some(200), Some(600)];
assert_eq!(max_values_col_x, expected_max_values_col_x);
let min_values_col_y =
as_int32_array(&composite_stats.min_values(&col_y).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_values_col_y = vec![Some(300), Some(700)];
assert_eq!(min_values_col_y, expected_values_col_y);
// Test null counts - only available from file statistics
assert!(composite_stats.null_counts(&part_a).is_none());
assert!(composite_stats.null_counts(&part_b).is_none());
let null_counts_col_x =
as_uint64_array(&composite_stats.null_counts(&col_x).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_null_counts_col_x = vec![Some(0), Some(10)];
assert_eq!(null_counts_col_x, expected_null_counts_col_x);
// Test row counts - only available from file statistics
assert!(composite_stats.row_counts(&part_a).is_none());
let row_counts_col_x =
as_uint64_array(&composite_stats.row_counts(&col_x).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts = vec![Some(100), Some(200)];
assert_eq!(row_counts_col_x, expected_row_counts);
// Test contained values - only available from partition statistics
let values = HashSet::from([ScalarValue::from(1i32)]);
let contained_part_a = composite_stats.contained(&part_a, &values).unwrap();
let expected_contained_part_a = BooleanArray::from(vec![true, false]);
assert_eq!(contained_part_a, expected_contained_part_a);
// File statistics don't implement contained
assert!(composite_stats.contained(&col_x, &values).is_none());
// Non-existent column should return None for everything
let non_existent = Column::new_unqualified("non_existent");
assert!(composite_stats.min_values(&non_existent).is_none());
assert!(composite_stats.max_values(&non_existent).is_none());
assert!(composite_stats.null_counts(&non_existent).is_none());
assert!(composite_stats.row_counts(&non_existent).is_none());
assert!(composite_stats.contained(&non_existent, &values).is_none());
// Verify num_containers matches
assert_eq!(composite_stats.num_containers(), 2);
}
#[test]
fn test_composite_pruning_statistics_priority() {
// Create two sets of file statistics with the same column names
// but different values to test that the first one gets priority
// First set of statistics
let first_statistics = vec![
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(100i32)))
.with_max_value(Precision::Exact(ScalarValue::from(200i32)))
.with_null_count(Precision::Exact(0)),
)
.with_num_rows(Precision::Exact(100)),
),
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(300i32)))
.with_max_value(Precision::Exact(ScalarValue::from(400i32)))
.with_null_count(Precision::Exact(5)),
)
.with_num_rows(Precision::Exact(200)),
),
];
let first_schema = Arc::new(Schema::new(vec![Field::new(
"col_a",
DataType::Int32,
false,
)]));
let first_stats = PrunableStatistics::new(first_statistics, first_schema);
// Second set of statistics with the same column name but different values
let second_statistics = vec![
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(1000i32)))
.with_max_value(Precision::Exact(ScalarValue::from(2000i32)))
.with_null_count(Precision::Exact(10)),
)
.with_num_rows(Precision::Exact(1000)),
),
Arc::new(
Statistics::default()
.add_column_statistics(
ColumnStatistics::new_unknown()
.with_min_value(Precision::Exact(ScalarValue::from(3000i32)))
.with_max_value(Precision::Exact(ScalarValue::from(4000i32)))
.with_null_count(Precision::Exact(20)),
)
.with_num_rows(Precision::Exact(2000)),
),
];
let second_schema = Arc::new(Schema::new(vec![Field::new(
"col_a",
DataType::Int32,
false,
)]));
let second_stats = PrunableStatistics::new(second_statistics, second_schema);
// Create composite statistics with first stats having priority
let composite_stats = CompositePruningStatistics::new(vec![
Box::new(first_stats.clone()),
Box::new(second_stats.clone()),
]);
let col_a = Column::new_unqualified("col_a");
// Should get values from first statistics since it has priority
let min_values = as_int32_array(&composite_stats.min_values(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_min_values = vec![Some(100), Some(300)];
assert_eq!(min_values, expected_min_values);
let max_values = as_int32_array(&composite_stats.max_values(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_max_values = vec![Some(200), Some(400)];
assert_eq!(max_values, expected_max_values);
let null_counts = as_uint64_array(&composite_stats.null_counts(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_null_counts = vec![Some(0), Some(5)];
assert_eq!(null_counts, expected_null_counts);
let row_counts = as_uint64_array(&composite_stats.row_counts(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts = vec![Some(100), Some(200)];
assert_eq!(row_counts, expected_row_counts);
// Create composite statistics with second stats having priority
// Now that we've added Clone trait to PrunableStatistics, we can just clone them
let composite_stats_reversed = CompositePruningStatistics::new(vec![
Box::new(second_stats.clone()),
Box::new(first_stats.clone()),
]);
// Should get values from second statistics since it now has priority
let min_values =
as_int32_array(&composite_stats_reversed.min_values(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_min_values = vec![Some(1000), Some(3000)];
assert_eq!(min_values, expected_min_values);
let max_values =
as_int32_array(&composite_stats_reversed.max_values(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_max_values = vec![Some(2000), Some(4000)];
assert_eq!(max_values, expected_max_values);
let null_counts =
as_uint64_array(&composite_stats_reversed.null_counts(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_null_counts = vec![Some(10), Some(20)];
assert_eq!(null_counts, expected_null_counts);
let row_counts =
as_uint64_array(&composite_stats_reversed.row_counts(&col_a).unwrap())
.unwrap()
.into_iter()
.collect::<Vec<_>>();
let expected_row_counts = vec![Some(1000), Some(2000)];
assert_eq!(row_counts, expected_row_counts);
}
#[test]
fn test_composite_pruning_statistics_empty_and_mismatched_containers() {
// Test with empty statistics vector
// This should never happen, so we panic instead of returning a Result which would burned callers
let result = std::panic::catch_unwind(|| {
CompositePruningStatistics::new(vec![]);
});
assert!(result.is_err());
// We should panic here because the number of containers is different
let result = std::panic::catch_unwind(|| {
// Create statistics with different number of containers
// Use partition stats for the test
let partition_values_1 = vec![
vec![ScalarValue::from(1i32), ScalarValue::from(10i32)],
vec![ScalarValue::from(2i32), ScalarValue::from(20i32)],
];
let partition_fields_1 = vec![
Arc::new(Field::new("part_a", DataType::Int32, false)),
Arc::new(Field::new("part_b", DataType::Int32, false)),
];
let partition_stats_1 = PartitionPruningStatistics::try_new(
partition_values_1,
partition_fields_1,
)
.unwrap();
let partition_values_2 = vec![
vec![ScalarValue::from(3i32), ScalarValue::from(30i32)],
vec![ScalarValue::from(4i32), ScalarValue::from(40i32)],
vec![ScalarValue::from(5i32), ScalarValue::from(50i32)],
];
let partition_fields_2 = vec![
Arc::new(Field::new("part_x", DataType::Int32, false)),
Arc::new(Field::new("part_y", DataType::Int32, false)),
];
let partition_stats_2 = PartitionPruningStatistics::try_new(
partition_values_2,
partition_fields_2,
)
.unwrap();
CompositePruningStatistics::new(vec![
Box::new(partition_stats_1),
Box::new(partition_stats_2),
]);
});
assert!(result.is_err());
}
}