blob: ebec8f3bd4b1922096c274cd764560175e2f49c8 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::ops::Range;
use std::ptr::NonNull;
use std::sync::Arc;
use arrow::array::{Array, ArrayData, ArrayRef, make_array};
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::pyarrow::{FromPyArrow, PyArrowType, ToPyArrow};
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::function::{PartitionEvaluatorArgs, WindowUDFFieldArgs};
use datafusion::logical_expr::window_state::WindowAggState;
use datafusion::logical_expr::{
PartitionEvaluator, PartitionEvaluatorFactory, Signature, Volatility, WindowUDF, WindowUDFImpl,
};
use datafusion::scalar::ScalarValue;
use datafusion_ffi::udwf::FFI_WindowUDF;
use datafusion_python_util::parse_volatility;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::{PyCapsule, PyList, PyTuple};
use crate::common::data_type::PyScalarValue;
use crate::errors::{PyDataFusionResult, to_datafusion_err};
use crate::expr::PyExpr;
#[derive(Debug)]
struct RustPartitionEvaluator {
evaluator: Py<PyAny>,
}
impl RustPartitionEvaluator {
fn new(evaluator: Py<PyAny>) -> Self {
Self { evaluator }
}
}
impl PartitionEvaluator for RustPartitionEvaluator {
fn memoize(&mut self, _state: &mut WindowAggState) -> Result<()> {
Python::attach(|py| self.evaluator.bind(py).call_method0("memoize").map(|_| ()))
.map_err(|e| DataFusionError::Execution(format!("{e}")))
}
fn get_range(&self, idx: usize, n_rows: usize) -> Result<Range<usize>> {
Python::attach(|py| {
let py_args = vec![idx.into_pyobject(py)?, n_rows.into_pyobject(py)?];
let py_args = PyTuple::new(py, py_args)?;
self.evaluator
.bind(py)
.call_method1("get_range", py_args)
.and_then(|v| {
let tuple: Bound<'_, PyTuple> = v.extract()?;
if tuple.len() != 2 {
return Err(PyValueError::new_err(format!(
"Expected get_range to return tuple of length 2. Received length {}",
tuple.len()
)));
}
let start: usize = tuple.get_item(0).unwrap().extract()?;
let end: usize = tuple.get_item(1).unwrap().extract()?;
Ok(Range { start, end })
})
})
.map_err(|e| DataFusionError::Execution(format!("{e}")))
}
fn is_causal(&self) -> bool {
Python::attach(|py| {
self.evaluator
.bind(py)
.call_method0("is_causal")
.and_then(|v| v.extract())
.unwrap_or(false)
})
}
fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
Python::attach(|py| {
let py_values = PyList::new(
py,
values
.iter()
.map(|arg| arg.into_data().to_pyarrow(py).unwrap()),
)?;
let py_num_rows = num_rows.into_pyobject(py)?;
let py_args = PyTuple::new(py, vec![py_values.as_any(), &py_num_rows])?;
self.evaluator
.bind(py)
.call_method1("evaluate_all", py_args)
.map(|v| {
let array_data = ArrayData::from_pyarrow_bound(&v).unwrap();
make_array(array_data)
})
})
.map_err(to_datafusion_err)
}
fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) -> Result<ScalarValue> {
Python::attach(|py| {
let py_values = PyList::new(
py,
values
.iter()
.map(|arg| arg.into_data().to_pyarrow(py).unwrap()),
)?;
let range_tuple = PyTuple::new(py, vec![range.start, range.end])?;
let py_args = PyTuple::new(py, vec![py_values.as_any(), range_tuple.as_any()])?;
self.evaluator
.bind(py)
.call_method1("evaluate", py_args)
.and_then(|v| v.extract::<PyScalarValue>())
.map(|v| v.0)
})
.map_err(to_datafusion_err)
}
fn evaluate_all_with_rank(
&self,
num_rows: usize,
ranks_in_partition: &[Range<usize>],
) -> Result<ArrayRef> {
Python::attach(|py| {
let ranks = ranks_in_partition
.iter()
.map(|r| PyTuple::new(py, vec![r.start, r.end]))
.collect::<PyResult<Vec<_>>>()?;
// 1. cast args to Pyarrow array
let py_args = vec![
num_rows.into_pyobject(py)?.into_any(),
PyList::new(py, ranks)?.into_any(),
];
let py_args = PyTuple::new(py, py_args)?;
// 2. call function
self.evaluator
.bind(py)
.call_method1("evaluate_all_with_rank", py_args)
.map(|v| {
let array_data = ArrayData::from_pyarrow_bound(&v).unwrap();
make_array(array_data)
})
})
.map_err(to_datafusion_err)
}
fn supports_bounded_execution(&self) -> bool {
Python::attach(|py| {
self.evaluator
.bind(py)
.call_method0("supports_bounded_execution")
.and_then(|v| v.extract())
.unwrap_or(false)
})
}
fn uses_window_frame(&self) -> bool {
Python::attach(|py| {
self.evaluator
.bind(py)
.call_method0("uses_window_frame")
.and_then(|v| v.extract())
.unwrap_or(false)
})
}
fn include_rank(&self) -> bool {
Python::attach(|py| {
self.evaluator
.bind(py)
.call_method0("include_rank")
.and_then(|v| v.extract())
.unwrap_or(false)
})
}
}
fn instantiate_partition_evaluator(evaluator: &Py<PyAny>) -> Result<Box<dyn PartitionEvaluator>> {
let instance = Python::attach(|py| {
evaluator
.call0(py)
.map_err(|e| DataFusionError::Execution(e.to_string()))
})?;
Ok(Box::new(RustPartitionEvaluator::new(instance)))
}
/// Wrap a Python evaluator factory in a `PartitionEvaluatorFactory`.
///
/// Retained for downstream callers that previously consumed this
/// helper to build a [`PartitionEvaluatorFactory`] for factory-based
/// APIs. New in-crate code should construct a
/// [`PythonFunctionWindowUDF`] directly so the codec can downcast and
/// ship it inline.
pub fn to_rust_partition_evaluator(evaluator: Py<PyAny>) -> PartitionEvaluatorFactory {
Arc::new(move || instantiate_partition_evaluator(&evaluator))
}
/// Represents an WindowUDF
#[pyclass(
from_py_object,
frozen,
name = "WindowUDF",
module = "datafusion",
subclass
)]
#[derive(Debug, Clone)]
pub struct PyWindowUDF {
pub(crate) function: WindowUDF,
}
#[pymethods]
impl PyWindowUDF {
#[new]
#[pyo3(signature=(name, evaluator, input_types, return_type, volatility))]
fn new(
name: &str,
evaluator: Py<PyAny>,
input_types: Vec<PyArrowType<DataType>>,
return_type: PyArrowType<DataType>,
volatility: &str,
) -> PyResult<Self> {
let return_type = return_type.0;
let input_types: Vec<DataType> = input_types.into_iter().map(|t| t.0).collect();
let function = WindowUDF::from(PythonFunctionWindowUDF::new(
name,
evaluator,
input_types,
return_type,
parse_volatility(volatility)?,
));
Ok(Self { function })
}
/// creates a new PyExpr with the call of the udf
#[pyo3(signature = (*args))]
fn __call__(&self, args: Vec<PyExpr>) -> PyResult<PyExpr> {
let args = args.iter().map(|e| e.expr.clone()).collect();
Ok(self.function.call(args).into())
}
#[staticmethod]
pub fn from_pycapsule(func: Bound<'_, PyAny>) -> PyDataFusionResult<Self> {
let capsule = if func.hasattr("__datafusion_window_udf__")? {
func.getattr("__datafusion_window_udf__")?.call0()?
} else {
func
};
let capsule = capsule.cast::<PyCapsule>().map_err(to_datafusion_err)?;
let data: NonNull<FFI_WindowUDF> = capsule
.pointer_checked(Some(c"datafusion_window_udf"))?
.cast();
let udwf = unsafe { data.as_ref() };
let udwf: Arc<dyn WindowUDFImpl> = udwf.into();
Ok(Self {
function: WindowUDF::new_from_shared_impl(udwf),
})
}
fn __repr__(&self) -> PyResult<String> {
Ok(format!("WindowUDF({})", self.function.name()))
}
#[getter]
fn name(&self) -> &str {
self.function.name()
}
}
/// `WindowUDFImpl` for Python-defined window UDFs.
///
/// Holds the Python evaluator factory directly so the codec can
/// downcast and cloudpickle it across process boundaries. Replaces
/// the prior factory-erased `MultiColumnWindowUDF`; the old name is
/// kept as a type alias below for backward compatibility.
#[derive(Debug)]
pub struct PythonFunctionWindowUDF {
name: String,
evaluator: Py<PyAny>,
signature: Signature,
return_type: DataType,
}
/// Backward-compatible alias for downstream crates that referenced the
/// previous struct name. New code should use [`PythonFunctionWindowUDF`].
pub type MultiColumnWindowUDF = PythonFunctionWindowUDF;
impl PythonFunctionWindowUDF {
pub fn new(
name: impl Into<String>,
evaluator: Py<PyAny>,
input_types: Vec<DataType>,
return_type: DataType,
volatility: Volatility,
) -> Self {
let name = name.into();
let signature = Signature::exact(input_types, volatility);
Self {
name,
evaluator,
signature,
return_type,
}
}
/// Stored Python callable that produces a fresh partition
/// evaluator instance per partition. Consumed by the codec to
/// cloudpickle the evaluator factory across process boundaries.
pub(crate) fn evaluator(&self) -> &Py<PyAny> {
&self.evaluator
}
pub(crate) fn return_type(&self) -> &DataType {
&self.return_type
}
}
impl Eq for PythonFunctionWindowUDF {}
impl PartialEq for PythonFunctionWindowUDF {
fn eq(&self, other: &Self) -> bool {
self.name == other.name
&& self.signature == other.signature
&& self.return_type == other.return_type
// Pointer-identity fast path: `Arc`-shared clones of the
// same UDF skip the GIL roundtrip. Falls through to Python
// `__eq__` only for two distinct callables.
&& (self.evaluator.as_ptr() == other.evaluator.as_ptr()
|| Python::attach(|py| {
// See `PythonFunctionScalarUDF::eq` for the
// rationale on swallowing the exception as `false`
// and logging at `debug`. FIXME: revisit if
// upstream `WindowUDFImpl` exposes a fallible
// `PartialEq`.
self.evaluator
.bind(py)
.eq(other.evaluator.bind(py))
.unwrap_or_else(|e| {
log::debug!(
target: "datafusion_python::udwf",
"PythonFunctionWindowUDF {:?} __eq__ raised; treating as unequal: {e}",
self.name,
);
false
})
}))
}
}
impl std::hash::Hash for PythonFunctionWindowUDF {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
// See `PythonFunctionScalarUDF`'s `Hash` impl for the
// rationale: hash the identifying header only and let
// `PartialEq` disambiguate evaluators.
self.name.hash(state);
self.signature.hash(state);
self.return_type.hash(state);
}
}
impl WindowUDFImpl for PythonFunctionWindowUDF {
fn name(&self) -> &str {
&self.name
}
fn signature(&self) -> &Signature {
&self.signature
}
fn field(&self, field_args: WindowUDFFieldArgs) -> Result<arrow::datatypes::FieldRef> {
// TODO: Should nullable always be `true`?
Ok(arrow::datatypes::Field::new(field_args.name(), self.return_type.clone(), true).into())
}
// TODO: Enable passing partition_evaluator_args to python?
fn partition_evaluator(
&self,
_partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
instantiate_partition_evaluator(&self.evaluator)
}
}