blob: c658a8eddc23320c2efd90aa5aa691d1f22b2747 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use crate::expressions::{self, Column};
use crate::{create_physical_expr, LexOrdering, PhysicalSortExpr};
use arrow::compute::SortOptions;
use arrow::datatypes::{Schema, SchemaRef};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{plan_err, Result};
use datafusion_common::{DFSchema, HashMap};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{Expr, SortExpr};
use itertools::izip;
// Exports:
pub(crate) use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
/// Adds the `offset` value to `Column` indices inside `expr`. This function is
/// generally used during the update of the right table schema in join operations.
pub fn add_offset_to_expr(
expr: Arc<dyn PhysicalExpr>,
offset: isize,
) -> Result<Arc<dyn PhysicalExpr>> {
expr.transform_down(|e| match e.as_any().downcast_ref::<Column>() {
Some(col) => {
let Some(idx) = col.index().checked_add_signed(offset) else {
return plan_err!("Column index overflow");
};
Ok(Transformed::yes(Arc::new(Column::new(col.name(), idx))))
}
None => Ok(Transformed::no(e)),
})
.data()
}
/// This function is similar to the `contains` method of `Vec`. It finds
/// whether `expr` is among `physical_exprs`.
pub fn physical_exprs_contains(
physical_exprs: &[Arc<dyn PhysicalExpr>],
expr: &Arc<dyn PhysicalExpr>,
) -> bool {
physical_exprs
.iter()
.any(|physical_expr| physical_expr.eq(expr))
}
/// Checks whether the given physical expression slices are equal.
pub fn physical_exprs_equal(
lhs: &[Arc<dyn PhysicalExpr>],
rhs: &[Arc<dyn PhysicalExpr>],
) -> bool {
lhs.len() == rhs.len() && izip!(lhs, rhs).all(|(lhs, rhs)| lhs.eq(rhs))
}
/// Checks whether the given physical expression slices are equal in the sense
/// of bags (multi-sets), disregarding their orderings.
pub fn physical_exprs_bag_equal(
lhs: &[Arc<dyn PhysicalExpr>],
rhs: &[Arc<dyn PhysicalExpr>],
) -> bool {
let mut multi_set_lhs: HashMap<_, usize> = HashMap::new();
let mut multi_set_rhs: HashMap<_, usize> = HashMap::new();
for expr in lhs {
*multi_set_lhs.entry(expr).or_insert(0) += 1;
}
for expr in rhs {
*multi_set_rhs.entry(expr).or_insert(0) += 1;
}
multi_set_lhs == multi_set_rhs
}
/// Converts logical sort expressions to physical sort expressions.
///
/// This function transforms a collection of logical sort expressions into their
/// physical representation that can be used during query execution.
///
/// # Arguments
///
/// * `schema` - The schema containing column definitions.
/// * `sort_order` - A collection of logical sort expressions grouped into
/// lexicographic orderings.
///
/// # Returns
///
/// A vector of lexicographic orderings for physical execution, or an error if
/// the transformation fails.
///
/// # Examples
///
/// ```
/// // Create orderings from columns "id" and "name"
/// # use arrow::datatypes::{Schema, Field, DataType};
/// # use datafusion_physical_expr::create_ordering;
/// # use datafusion_common::Column;
/// # use datafusion_expr::{Expr, SortExpr};
/// #
/// // Create a schema with two fields
/// let schema = Schema::new(vec![
/// Field::new("id", DataType::Int32, false),
/// Field::new("name", DataType::Utf8, false),
/// ]);
///
/// let sort_exprs = vec![
/// vec![SortExpr {
/// expr: Expr::Column(Column::new(Some("t"), "id")),
/// asc: true,
/// nulls_first: false,
/// }],
/// vec![SortExpr {
/// expr: Expr::Column(Column::new(Some("t"), "name")),
/// asc: false,
/// nulls_first: true,
/// }],
/// ];
/// let result = create_ordering(&schema, &sort_exprs).unwrap();
/// ```
pub fn create_ordering(
schema: &Schema,
sort_order: &[Vec<SortExpr>],
) -> Result<Vec<LexOrdering>> {
let mut all_sort_orders = vec![];
for (group_idx, exprs) in sort_order.iter().enumerate() {
// Construct PhysicalSortExpr objects from Expr objects:
let mut sort_exprs = vec![];
for (expr_idx, sort) in exprs.iter().enumerate() {
match &sort.expr {
Expr::Column(col) => match expressions::col(&col.name, schema) {
Ok(expr) => {
let opts = SortOptions::new(!sort.asc, sort.nulls_first);
sort_exprs.push(PhysicalSortExpr::new(expr, opts));
}
// Cannot find expression in the projected_schema, stop iterating
// since rest of the orderings are violated
Err(_) => break,
},
expr => {
return plan_err!(
"Expected single column reference in sort_order[{}][{}], got {}",
group_idx,
expr_idx,
expr
);
}
}
}
all_sort_orders.extend(LexOrdering::new(sort_exprs));
}
Ok(all_sort_orders)
}
/// Creates a vector of [LexOrdering] from a vector of logical expression
pub fn create_lex_ordering(
schema: &SchemaRef,
sort_order: &[Vec<SortExpr>],
execution_props: &ExecutionProps,
) -> Result<Vec<LexOrdering>> {
// Try the fast path that only supports column references first
// This avoids creating a DFSchema
if let Ok(ordering) = create_ordering(schema, sort_order) {
return Ok(ordering);
}
let df_schema = DFSchema::try_from(Arc::clone(schema))?;
let mut all_sort_orders = vec![];
for exprs in sort_order.iter() {
all_sort_orders.extend(LexOrdering::new(create_physical_sort_exprs(
exprs,
&df_schema,
execution_props,
)?));
}
Ok(all_sort_orders)
}
/// Create a physical sort expression from a logical expression
pub fn create_physical_sort_expr(
e: &SortExpr,
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<PhysicalSortExpr> {
create_physical_expr(&e.expr, input_dfschema, execution_props).map(|expr| {
let options = SortOptions::new(!e.asc, e.nulls_first);
PhysicalSortExpr::new(expr, options)
})
}
/// Create vector of physical sort expression from a vector of logical expression
pub fn create_physical_sort_exprs(
exprs: &[SortExpr],
input_dfschema: &DFSchema,
execution_props: &ExecutionProps,
) -> Result<Vec<PhysicalSortExpr>> {
exprs
.iter()
.map(|e| create_physical_sort_expr(e, input_dfschema, execution_props))
.collect()
}
pub fn add_offset_to_physical_sort_exprs(
sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
offset: isize,
) -> Result<Vec<PhysicalSortExpr>> {
sort_exprs
.into_iter()
.map(|mut sort_expr| {
sort_expr.expr = add_offset_to_expr(sort_expr.expr, offset)?;
Ok(sort_expr)
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::expressions::{BinaryExpr, Column, Literal};
use crate::physical_expr::{
physical_exprs_bag_equal, physical_exprs_contains, physical_exprs_equal,
};
use datafusion_physical_expr_common::physical_expr::is_volatile;
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::ColumnarValue;
use datafusion_expr::Operator;
use std::any::Any;
use std::fmt;
#[test]
fn test_physical_exprs_contains() {
let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
as Arc<dyn PhysicalExpr>;
let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
as Arc<dyn PhysicalExpr>;
let lit4 =
Arc::new(Literal::new(ScalarValue::Int32(Some(4)))) as Arc<dyn PhysicalExpr>;
let lit2 =
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
let lit1 =
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
let col_a_expr = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
let col_c_expr = Arc::new(Column::new("c", 2)) as Arc<dyn PhysicalExpr>;
// lit(true), lit(false), lit(4), lit(2), Col(a), Col(b)
let physical_exprs: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::clone(&lit_true),
Arc::clone(&lit_false),
Arc::clone(&lit4),
Arc::clone(&lit2),
Arc::clone(&col_a_expr),
Arc::clone(&col_b_expr),
];
// below expressions are inside physical_exprs
assert!(physical_exprs_contains(&physical_exprs, &lit_true));
assert!(physical_exprs_contains(&physical_exprs, &lit2));
assert!(physical_exprs_contains(&physical_exprs, &col_b_expr));
// below expressions are not inside physical_exprs
assert!(!physical_exprs_contains(&physical_exprs, &col_c_expr));
assert!(!physical_exprs_contains(&physical_exprs, &lit1));
}
#[test]
fn test_physical_exprs_equal() {
let lit_true = Arc::new(Literal::new(ScalarValue::Boolean(Some(true))))
as Arc<dyn PhysicalExpr>;
let lit_false = Arc::new(Literal::new(ScalarValue::Boolean(Some(false))))
as Arc<dyn PhysicalExpr>;
let lit1 =
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))) as Arc<dyn PhysicalExpr>;
let lit2 =
Arc::new(Literal::new(ScalarValue::Int32(Some(2)))) as Arc<dyn PhysicalExpr>;
let col_b_expr = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
let vec1 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
let vec2 = vec![Arc::clone(&lit_true), Arc::clone(&col_b_expr)];
let vec3 = vec![Arc::clone(&lit2), Arc::clone(&lit1)];
let vec4 = vec![Arc::clone(&lit_true), Arc::clone(&lit_false)];
// these vectors are same
assert!(physical_exprs_equal(&vec1, &vec1));
assert!(physical_exprs_equal(&vec1, &vec4));
assert!(physical_exprs_bag_equal(&vec1, &vec1));
assert!(physical_exprs_bag_equal(&vec1, &vec4));
// these vectors are different
assert!(!physical_exprs_equal(&vec1, &vec2));
assert!(!physical_exprs_equal(&vec1, &vec3));
assert!(!physical_exprs_bag_equal(&vec1, &vec2));
assert!(!physical_exprs_bag_equal(&vec1, &vec3));
}
#[test]
fn test_physical_exprs_set_equal() {
let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
];
let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("a", 0)),
];
assert!(!physical_exprs_bag_equal(
list1.as_slice(),
list2.as_slice()
));
assert!(!physical_exprs_bag_equal(
list2.as_slice(),
list1.as_slice()
));
assert!(!physical_exprs_equal(list1.as_slice(), list2.as_slice()));
assert!(!physical_exprs_equal(list2.as_slice(), list1.as_slice()));
let list3: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("c", 2)),
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("b", 1)),
];
let list4: Vec<Arc<dyn PhysicalExpr>> = vec![
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("b", 1)),
Arc::new(Column::new("a", 0)),
Arc::new(Column::new("c", 2)),
Arc::new(Column::new("a", 0)),
];
assert!(physical_exprs_bag_equal(list3.as_slice(), list4.as_slice()));
assert!(physical_exprs_bag_equal(list4.as_slice(), list3.as_slice()));
assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
assert!(!physical_exprs_equal(list3.as_slice(), list4.as_slice()));
assert!(!physical_exprs_equal(list4.as_slice(), list3.as_slice()));
assert!(physical_exprs_bag_equal(list3.as_slice(), list3.as_slice()));
assert!(physical_exprs_bag_equal(list4.as_slice(), list4.as_slice()));
}
#[test]
fn test_is_volatile_default_behavior() {
// Test that default PhysicalExpr implementations are not volatile
let literal =
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
let column = Arc::new(Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
// Test is_volatile_node() - should return false by default
assert!(!literal.is_volatile_node());
assert!(!column.is_volatile_node());
// Test is_volatile() - should return false for non-volatile expressions
assert!(!is_volatile(&literal));
assert!(!is_volatile(&column));
}
/// Mock volatile PhysicalExpr for testing purposes
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct MockVolatileExpr {
volatile: bool,
}
impl MockVolatileExpr {
fn new(volatile: bool) -> Self {
Self { volatile }
}
}
impl fmt::Display for MockVolatileExpr {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "MockVolatile({})", self.volatile)
}
}
impl PhysicalExpr for MockVolatileExpr {
fn as_any(&self) -> &dyn Any {
self
}
fn data_type(&self, _input_schema: &Schema) -> Result<DataType> {
Ok(DataType::Boolean)
}
fn nullable(&self, _input_schema: &Schema) -> Result<bool> {
Ok(false)
}
fn evaluate(&self, _batch: &RecordBatch) -> Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(
self.volatile,
))))
}
fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![]
}
fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
Ok(self)
}
fn is_volatile_node(&self) -> bool {
self.volatile
}
fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "mock_volatile({})", self.volatile)
}
}
#[test]
fn test_nested_expression_volatility() {
// Test that is_volatile() recursively detects volatility in expression trees
// Create a volatile mock expression
let volatile_expr =
Arc::new(MockVolatileExpr::new(true)) as Arc<dyn PhysicalExpr>;
assert!(volatile_expr.is_volatile_node());
assert!(is_volatile(&volatile_expr));
// Create a non-volatile mock expression
let stable_expr = Arc::new(MockVolatileExpr::new(false)) as Arc<dyn PhysicalExpr>;
assert!(!stable_expr.is_volatile_node());
assert!(!is_volatile(&stable_expr));
// Create a literal (non-volatile)
let literal =
Arc::new(Literal::new(ScalarValue::Int32(Some(42)))) as Arc<dyn PhysicalExpr>;
assert!(!literal.is_volatile_node());
assert!(!is_volatile(&literal));
// Test composite expression: volatile_expr AND literal
// The BinaryExpr itself is not volatile, but contains a volatile child
let composite_expr = Arc::new(BinaryExpr::new(
Arc::clone(&volatile_expr),
Operator::And,
Arc::clone(&literal),
)) as Arc<dyn PhysicalExpr>;
assert!(!composite_expr.is_volatile_node()); // BinaryExpr itself is not volatile
assert!(is_volatile(&composite_expr)); // But it contains a volatile child
// Test composite expression with all non-volatile children
let stable_composite = Arc::new(BinaryExpr::new(
Arc::clone(&stable_expr),
Operator::And,
Arc::clone(&literal),
)) as Arc<dyn PhysicalExpr>;
assert!(!stable_composite.is_volatile_node());
assert!(!is_volatile(&stable_composite)); // No volatile children
}
}