| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::ops::Range; |
| use std::ptr::NonNull; |
| use std::sync::Arc; |
| |
| use arrow::array::{Array, ArrayData, ArrayRef, make_array}; |
| use datafusion::arrow::datatypes::DataType; |
| use datafusion::arrow::pyarrow::{FromPyArrow, PyArrowType, ToPyArrow}; |
| use datafusion::error::{DataFusionError, Result}; |
| use datafusion::logical_expr::function::{PartitionEvaluatorArgs, WindowUDFFieldArgs}; |
| use datafusion::logical_expr::window_state::WindowAggState; |
| use datafusion::logical_expr::{ |
| PartitionEvaluator, PartitionEvaluatorFactory, Signature, Volatility, WindowUDF, WindowUDFImpl, |
| }; |
| use datafusion::scalar::ScalarValue; |
| use datafusion_ffi::udwf::FFI_WindowUDF; |
| use datafusion_python_util::parse_volatility; |
| use pyo3::exceptions::PyValueError; |
| use pyo3::prelude::*; |
| use pyo3::types::{PyCapsule, PyList, PyTuple}; |
| |
| use crate::common::data_type::PyScalarValue; |
| use crate::errors::{PyDataFusionResult, to_datafusion_err}; |
| use crate::expr::PyExpr; |
| |
| #[derive(Debug)] |
| struct RustPartitionEvaluator { |
| evaluator: Py<PyAny>, |
| } |
| |
| impl RustPartitionEvaluator { |
| fn new(evaluator: Py<PyAny>) -> Self { |
| Self { evaluator } |
| } |
| } |
| |
| impl PartitionEvaluator for RustPartitionEvaluator { |
| fn memoize(&mut self, _state: &mut WindowAggState) -> Result<()> { |
| Python::attach(|py| self.evaluator.bind(py).call_method0("memoize").map(|_| ())) |
| .map_err(|e| DataFusionError::Execution(format!("{e}"))) |
| } |
| |
| fn get_range(&self, idx: usize, n_rows: usize) -> Result<Range<usize>> { |
| Python::attach(|py| { |
| let py_args = vec![idx.into_pyobject(py)?, n_rows.into_pyobject(py)?]; |
| let py_args = PyTuple::new(py, py_args)?; |
| |
| self.evaluator |
| .bind(py) |
| .call_method1("get_range", py_args) |
| .and_then(|v| { |
| let tuple: Bound<'_, PyTuple> = v.extract()?; |
| if tuple.len() != 2 { |
| return Err(PyValueError::new_err(format!( |
| "Expected get_range to return tuple of length 2. Received length {}", |
| tuple.len() |
| ))); |
| } |
| |
| let start: usize = tuple.get_item(0).unwrap().extract()?; |
| let end: usize = tuple.get_item(1).unwrap().extract()?; |
| |
| Ok(Range { start, end }) |
| }) |
| }) |
| .map_err(|e| DataFusionError::Execution(format!("{e}"))) |
| } |
| |
| fn is_causal(&self) -> bool { |
| Python::attach(|py| { |
| self.evaluator |
| .bind(py) |
| .call_method0("is_causal") |
| .and_then(|v| v.extract()) |
| .unwrap_or(false) |
| }) |
| } |
| |
| fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> { |
| Python::attach(|py| { |
| let py_values = PyList::new( |
| py, |
| values |
| .iter() |
| .map(|arg| arg.into_data().to_pyarrow(py).unwrap()), |
| )?; |
| let py_num_rows = num_rows.into_pyobject(py)?; |
| let py_args = PyTuple::new(py, vec![py_values.as_any(), &py_num_rows])?; |
| |
| self.evaluator |
| .bind(py) |
| .call_method1("evaluate_all", py_args) |
| .map(|v| { |
| let array_data = ArrayData::from_pyarrow_bound(&v).unwrap(); |
| make_array(array_data) |
| }) |
| }) |
| .map_err(to_datafusion_err) |
| } |
| |
| fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) -> Result<ScalarValue> { |
| Python::attach(|py| { |
| let py_values = PyList::new( |
| py, |
| values |
| .iter() |
| .map(|arg| arg.into_data().to_pyarrow(py).unwrap()), |
| )?; |
| let range_tuple = PyTuple::new(py, vec![range.start, range.end])?; |
| let py_args = PyTuple::new(py, vec![py_values.as_any(), range_tuple.as_any()])?; |
| |
| self.evaluator |
| .bind(py) |
| .call_method1("evaluate", py_args) |
| .and_then(|v| v.extract::<PyScalarValue>()) |
| .map(|v| v.0) |
| }) |
| .map_err(to_datafusion_err) |
| } |
| |
| fn evaluate_all_with_rank( |
| &self, |
| num_rows: usize, |
| ranks_in_partition: &[Range<usize>], |
| ) -> Result<ArrayRef> { |
| Python::attach(|py| { |
| let ranks = ranks_in_partition |
| .iter() |
| .map(|r| PyTuple::new(py, vec![r.start, r.end])) |
| .collect::<PyResult<Vec<_>>>()?; |
| |
| // 1. cast args to Pyarrow array |
| let py_args = vec![ |
| num_rows.into_pyobject(py)?.into_any(), |
| PyList::new(py, ranks)?.into_any(), |
| ]; |
| |
| let py_args = PyTuple::new(py, py_args)?; |
| |
| // 2. call function |
| self.evaluator |
| .bind(py) |
| .call_method1("evaluate_all_with_rank", py_args) |
| .map(|v| { |
| let array_data = ArrayData::from_pyarrow_bound(&v).unwrap(); |
| make_array(array_data) |
| }) |
| }) |
| .map_err(to_datafusion_err) |
| } |
| |
| fn supports_bounded_execution(&self) -> bool { |
| Python::attach(|py| { |
| self.evaluator |
| .bind(py) |
| .call_method0("supports_bounded_execution") |
| .and_then(|v| v.extract()) |
| .unwrap_or(false) |
| }) |
| } |
| |
| fn uses_window_frame(&self) -> bool { |
| Python::attach(|py| { |
| self.evaluator |
| .bind(py) |
| .call_method0("uses_window_frame") |
| .and_then(|v| v.extract()) |
| .unwrap_or(false) |
| }) |
| } |
| |
| fn include_rank(&self) -> bool { |
| Python::attach(|py| { |
| self.evaluator |
| .bind(py) |
| .call_method0("include_rank") |
| .and_then(|v| v.extract()) |
| .unwrap_or(false) |
| }) |
| } |
| } |
| |
| fn instantiate_partition_evaluator(evaluator: &Py<PyAny>) -> Result<Box<dyn PartitionEvaluator>> { |
| let instance = Python::attach(|py| { |
| evaluator |
| .call0(py) |
| .map_err(|e| DataFusionError::Execution(e.to_string())) |
| })?; |
| Ok(Box::new(RustPartitionEvaluator::new(instance))) |
| } |
| |
| /// Wrap a Python evaluator factory in a `PartitionEvaluatorFactory`. |
| /// |
| /// Retained for downstream callers that previously consumed this |
| /// helper to build a [`PartitionEvaluatorFactory`] for factory-based |
| /// APIs. New in-crate code should construct a |
| /// [`PythonFunctionWindowUDF`] directly so the codec can downcast and |
| /// ship it inline. |
| pub fn to_rust_partition_evaluator(evaluator: Py<PyAny>) -> PartitionEvaluatorFactory { |
| Arc::new(move || instantiate_partition_evaluator(&evaluator)) |
| } |
| |
| /// Represents an WindowUDF |
| #[pyclass( |
| from_py_object, |
| frozen, |
| name = "WindowUDF", |
| module = "datafusion", |
| subclass |
| )] |
| #[derive(Debug, Clone)] |
| pub struct PyWindowUDF { |
| pub(crate) function: WindowUDF, |
| } |
| |
| #[pymethods] |
| impl PyWindowUDF { |
| #[new] |
| #[pyo3(signature=(name, evaluator, input_types, return_type, volatility))] |
| fn new( |
| name: &str, |
| evaluator: Py<PyAny>, |
| input_types: Vec<PyArrowType<DataType>>, |
| return_type: PyArrowType<DataType>, |
| volatility: &str, |
| ) -> PyResult<Self> { |
| let return_type = return_type.0; |
| let input_types: Vec<DataType> = input_types.into_iter().map(|t| t.0).collect(); |
| |
| let function = WindowUDF::from(PythonFunctionWindowUDF::new( |
| name, |
| evaluator, |
| input_types, |
| return_type, |
| parse_volatility(volatility)?, |
| )); |
| Ok(Self { function }) |
| } |
| |
| /// creates a new PyExpr with the call of the udf |
| #[pyo3(signature = (*args))] |
| fn __call__(&self, args: Vec<PyExpr>) -> PyResult<PyExpr> { |
| let args = args.iter().map(|e| e.expr.clone()).collect(); |
| Ok(self.function.call(args).into()) |
| } |
| |
| #[staticmethod] |
| pub fn from_pycapsule(func: Bound<'_, PyAny>) -> PyDataFusionResult<Self> { |
| let capsule = if func.hasattr("__datafusion_window_udf__")? { |
| func.getattr("__datafusion_window_udf__")?.call0()? |
| } else { |
| func |
| }; |
| |
| let capsule = capsule.cast::<PyCapsule>().map_err(to_datafusion_err)?; |
| let data: NonNull<FFI_WindowUDF> = capsule |
| .pointer_checked(Some(c"datafusion_window_udf"))? |
| .cast(); |
| let udwf = unsafe { data.as_ref() }; |
| let udwf: Arc<dyn WindowUDFImpl> = udwf.into(); |
| |
| Ok(Self { |
| function: WindowUDF::new_from_shared_impl(udwf), |
| }) |
| } |
| |
| fn __repr__(&self) -> PyResult<String> { |
| Ok(format!("WindowUDF({})", self.function.name())) |
| } |
| |
| #[getter] |
| fn name(&self) -> &str { |
| self.function.name() |
| } |
| } |
| |
| /// `WindowUDFImpl` for Python-defined window UDFs. |
| /// |
| /// Holds the Python evaluator factory directly so the codec can |
| /// downcast and cloudpickle it across process boundaries. Replaces |
| /// the prior factory-erased `MultiColumnWindowUDF`; the old name is |
| /// kept as a type alias below for backward compatibility. |
| #[derive(Debug)] |
| pub struct PythonFunctionWindowUDF { |
| name: String, |
| evaluator: Py<PyAny>, |
| signature: Signature, |
| return_type: DataType, |
| } |
| |
| /// Backward-compatible alias for downstream crates that referenced the |
| /// previous struct name. New code should use [`PythonFunctionWindowUDF`]. |
| pub type MultiColumnWindowUDF = PythonFunctionWindowUDF; |
| |
| impl PythonFunctionWindowUDF { |
| pub fn new( |
| name: impl Into<String>, |
| evaluator: Py<PyAny>, |
| input_types: Vec<DataType>, |
| return_type: DataType, |
| volatility: Volatility, |
| ) -> Self { |
| let name = name.into(); |
| let signature = Signature::exact(input_types, volatility); |
| Self { |
| name, |
| evaluator, |
| signature, |
| return_type, |
| } |
| } |
| |
| /// Stored Python callable that produces a fresh partition |
| /// evaluator instance per partition. Consumed by the codec to |
| /// cloudpickle the evaluator factory across process boundaries. |
| pub(crate) fn evaluator(&self) -> &Py<PyAny> { |
| &self.evaluator |
| } |
| |
| pub(crate) fn return_type(&self) -> &DataType { |
| &self.return_type |
| } |
| } |
| |
| impl Eq for PythonFunctionWindowUDF {} |
| impl PartialEq for PythonFunctionWindowUDF { |
| fn eq(&self, other: &Self) -> bool { |
| self.name == other.name |
| && self.signature == other.signature |
| && self.return_type == other.return_type |
| // Pointer-identity fast path: `Arc`-shared clones of the |
| // same UDF skip the GIL roundtrip. Falls through to Python |
| // `__eq__` only for two distinct callables. |
| && (self.evaluator.as_ptr() == other.evaluator.as_ptr() |
| || Python::attach(|py| { |
| // See `PythonFunctionScalarUDF::eq` for the |
| // rationale on swallowing the exception as `false` |
| // and logging at `debug`. FIXME: revisit if |
| // upstream `WindowUDFImpl` exposes a fallible |
| // `PartialEq`. |
| self.evaluator |
| .bind(py) |
| .eq(other.evaluator.bind(py)) |
| .unwrap_or_else(|e| { |
| log::debug!( |
| target: "datafusion_python::udwf", |
| "PythonFunctionWindowUDF {:?} __eq__ raised; treating as unequal: {e}", |
| self.name, |
| ); |
| false |
| }) |
| })) |
| } |
| } |
| |
| impl std::hash::Hash for PythonFunctionWindowUDF { |
| fn hash<H: std::hash::Hasher>(&self, state: &mut H) { |
| // See `PythonFunctionScalarUDF`'s `Hash` impl for the |
| // rationale: hash the identifying header only and let |
| // `PartialEq` disambiguate evaluators. |
| self.name.hash(state); |
| self.signature.hash(state); |
| self.return_type.hash(state); |
| } |
| } |
| |
| impl WindowUDFImpl for PythonFunctionWindowUDF { |
| fn name(&self) -> &str { |
| &self.name |
| } |
| |
| fn signature(&self) -> &Signature { |
| &self.signature |
| } |
| |
| fn field(&self, field_args: WindowUDFFieldArgs) -> Result<arrow::datatypes::FieldRef> { |
| // TODO: Should nullable always be `true`? |
| Ok(arrow::datatypes::Field::new(field_args.name(), self.return_type.clone(), true).into()) |
| } |
| |
| // TODO: Enable passing partition_evaluator_args to python? |
| fn partition_evaluator( |
| &self, |
| _partition_evaluator_args: PartitionEvaluatorArgs, |
| ) -> Result<Box<dyn PartitionEvaluator>> { |
| instantiate_partition_evaluator(&self.evaluator) |
| } |
| } |