blob: c090cd2bcca91a9a0bf55e4164e9a504cd803c83 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use arrow::array::{ArrayRef, BooleanArray, Int32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::common::{DFSchema, ScalarValue};
use datafusion::execution::context::ExecutionProps;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::prelude::*;
use std::collections::HashSet;
use std::sync::Arc;
/// This example shows how to use DataFusion's `PruningPredicate` to prove
/// filter expressions can never be true based on statistics such as min/max
/// values of columns.
///
/// The process is called "pruning" and is commonly used in query engines to
/// quickly eliminate entire files / partitions / row groups of data from
/// consideration using statistical information from a catalog or other
/// metadata.
///
/// This example uses a user defined catalog to supply pruning information, as
/// one might do as part of a higher level storage engine. See
/// `parquet_index.rs` for an example that uses pruning in the context of an
/// individual query.
#[tokio::main]
async fn main() {
// In this example, we'll use the PruningPredicate to determine if
// the expression `x = 5 AND y = 10` can never be true based on statistics
// Start with the expression `x = 5 AND y = 10`
let expr = col("x").eq(lit(5)).and(col("y").eq(lit(10)));
// We can analyze this predicate using information provided by the
// `PruningStatistics` trait, in this case we'll use a simple catalog that
// models three files. For all rows in each file:
//
// File 1: x has values between `4` and `6`
// y has the value 10
//
// File 2: x has values between `4` and `6`
// y has the value of `7`
//
// File 3: x has the value 1
// nothing is known about the value of y
let my_catalog = MyCatalog::new();
// Create a `PruningPredicate`.
//
// Note the predicate does not automatically coerce types or simplify
// expressions. See expr_api.rs examples for how to do this if required
let predicate = create_pruning_predicate(expr, &my_catalog.schema);
// Evaluate the predicate for the three files in the catalog
let prune_results = predicate.prune(&my_catalog).unwrap();
println!("Pruning results: {prune_results:?}");
// The result is a `Vec` of bool values, one for each file in the catalog
assert_eq!(
prune_results,
vec![
// File 1: `x = 5 AND y = 10` can evaluate to true if x has values
// between `4` and `6`, y has the value `10`, so the file can not be
// skipped
//
// NOTE this doesn't mean there actually are rows that evaluate to
// true, but the pruning predicate can't prove there aren't any.
true,
// File 2: `x = 5 AND y = 10` can never evaluate to true because y
// has only the value of 7. Thus this file can be skipped.
false,
// File 3: `x = 5 AND y = 10` can never evaluate to true because x
// has the value `1`, and for any value of `y` the expression will
// evaluate to false (`x = 5 AND y = 10 -->` false AND null` -->
// `false`). Thus this file can also be skipped.
false
]
);
}
/// A simple model catalog that has information about the three files that store
/// data for a table with two columns (x and y).
struct MyCatalog {
schema: SchemaRef,
// (min, max) for x
x_values: Vec<(Option<i32>, Option<i32>)>,
// (min, max) for y
y_values: Vec<(Option<i32>, Option<i32>)>,
}
impl MyCatalog {
fn new() -> Self {
MyCatalog {
schema: Arc::new(Schema::new(vec![
Field::new("x", DataType::Int32, false),
Field::new("y", DataType::Int32, false),
])),
x_values: vec![
// File 1: x has values between `4` and `6`
(Some(4), Some(6)),
// File 2: x has values between `4` and `6`
(Some(4), Some(6)),
// File 3: x has the value 1
(Some(1), Some(1)),
],
y_values: vec![
// File 1: y has the value 10
(Some(10), Some(10)),
// File 2: y has the value of `7`
(Some(7), Some(7)),
// File 3: nothing is known about the value of y. This is
// represented as (None, None).
//
// Note, returning null means the value isn't known, NOT
// that we know the entire column is null.
(None, None),
],
}
}
}
/// We communicate the statistical information to DataFusion by implementing the
/// PruningStatistics trait.
impl PruningStatistics for MyCatalog {
fn num_containers(&self) -> usize {
// there are 3 files in this "catalog", and thus each array returned
// from min_values and max_values also has 3 elements
3
}
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
// The pruning predicate evaluates the bounds for multiple expressions
// at once, so return an array with an element for the minimum value in
// each file
match column.name.as_str() {
"x" => Some(i32_array(self.x_values.iter().map(|(min, _)| min))),
"y" => Some(i32_array(self.y_values.iter().map(|(min, _)| min))),
name => panic!("unknown column name: {name}"),
}
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
// similarly to min_values, return an array with an element for the
// maximum value in each file
match column.name.as_str() {
"x" => Some(i32_array(self.x_values.iter().map(|(_, max)| max))),
"y" => Some(i32_array(self.y_values.iter().map(|(_, max)| max))),
name => panic!("unknown column name: {name}"),
}
}
fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
// In this example, we know nothing about the number of nulls
None
}
fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
// In this example, we know nothing about the number of rows in each file
None
}
fn contained(
&self,
_column: &Column,
_values: &HashSet<ScalarValue>,
) -> Option<BooleanArray> {
// this method can be used to implement Bloom filter like filtering
// but we do not illustrate that here
None
}
}
fn create_pruning_predicate(expr: Expr, schema: &SchemaRef) -> PruningPredicate {
let df_schema = DFSchema::try_from(schema.as_ref().clone()).unwrap();
let props = ExecutionProps::new();
let physical_expr = create_physical_expr(&expr, &df_schema, &props).unwrap();
PruningPredicate::try_new(physical_expr, schema.clone()).unwrap()
}
fn i32_array<'a>(values: impl Iterator<Item = &'a Option<i32>>) -> ArrayRef {
Arc::new(Int32Array::from_iter(values.cloned()))
}