blob: 90ce6bf0ee8794433816df1e8a820dbcd51ca873 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use pyo3::{basic::CompareOp, prelude::*};
use std::convert::{From, Into};
use datafusion::arrow::datatypes::DataType;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField};
use crate::errors::py_runtime_err;
use crate::expr::aggregate_expr::PyAggregateFunction;
use crate::expr::binary_expr::PyBinaryExpr;
use crate::expr::column::PyColumn;
use crate::expr::literal::PyLiteral;
use datafusion::scalar::ScalarValue;
pub mod aggregate;
pub mod aggregate_expr;
pub mod analyze;
pub mod binary_expr;
pub mod column;
pub mod filter;
pub mod limit;
pub mod literal;
pub mod logical_node;
pub mod projection;
pub mod sort;
pub mod table_scan;
/// A PyExpr that can be used on a DataFrame
#[pyclass(name = "Expr", module = "datafusion.expr", subclass)]
#[derive(Debug, Clone)]
pub(crate) struct PyExpr {
pub(crate) expr: Expr,
}
impl From<PyExpr> for Expr {
fn from(expr: PyExpr) -> Expr {
expr.expr
}
}
impl From<Expr> for PyExpr {
fn from(expr: Expr) -> PyExpr {
PyExpr { expr }
}
}
#[pymethods]
impl PyExpr {
/// Return the specific expression
fn to_variant(&self, py: Python) -> PyResult<PyObject> {
Python::with_gil(|_| match &self.expr {
Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_py(py)),
Expr::Literal(value) => Ok(PyLiteral::from(value.clone()).into_py(py)),
Expr::BinaryExpr(expr) => Ok(PyBinaryExpr::from(expr.clone()).into_py(py)),
Expr::AggregateFunction(expr) => {
Ok(PyAggregateFunction::from(expr.clone()).into_py(py))
}
other => Err(py_runtime_err(format!(
"Cannot convert this Expr to a Python object: {:?}",
other
))),
})
}
fn __richcmp__(&self, other: PyExpr, op: CompareOp) -> PyExpr {
let expr = match op {
CompareOp::Lt => self.expr.clone().lt(other.expr),
CompareOp::Le => self.expr.clone().lt_eq(other.expr),
CompareOp::Eq => self.expr.clone().eq(other.expr),
CompareOp::Ne => self.expr.clone().not_eq(other.expr),
CompareOp::Gt => self.expr.clone().gt(other.expr),
CompareOp::Ge => self.expr.clone().gt_eq(other.expr),
};
expr.into()
}
fn __repr__(&self) -> PyResult<String> {
Ok(format!("Expr({})", self.expr))
}
fn __add__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
Ok((self.expr.clone() + rhs.expr).into())
}
fn __sub__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
Ok((self.expr.clone() - rhs.expr).into())
}
fn __truediv__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
Ok((self.expr.clone() / rhs.expr).into())
}
fn __mul__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
Ok((self.expr.clone() * rhs.expr).into())
}
fn __mod__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
Ok(self.expr.clone().modulus(rhs.expr).into())
}
fn __and__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
Ok(self.expr.clone().and(rhs.expr).into())
}
fn __or__(&self, rhs: PyExpr) -> PyResult<PyExpr> {
Ok(self.expr.clone().or(rhs.expr).into())
}
fn __invert__(&self) -> PyResult<PyExpr> {
Ok(self.expr.clone().not().into())
}
fn __getitem__(&self, key: &str) -> PyResult<PyExpr> {
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(self.expr.clone()),
ScalarValue::Utf8(Some(key.to_string())),
))
.into())
}
#[staticmethod]
pub fn literal(value: ScalarValue) -> PyExpr {
lit(value).into()
}
#[staticmethod]
pub fn column(value: &str) -> PyExpr {
col(value).into()
}
/// assign a name to the PyExpr
pub fn alias(&self, name: &str) -> PyExpr {
self.expr.clone().alias(name).into()
}
/// Create a sort PyExpr from an existing PyExpr.
#[pyo3(signature = (ascending=true, nulls_first=true))]
pub fn sort(&self, ascending: bool, nulls_first: bool) -> PyExpr {
self.expr.clone().sort(ascending, nulls_first).into()
}
pub fn is_null(&self) -> PyExpr {
self.expr.clone().is_null().into()
}
pub fn cast(&self, to: PyArrowType<DataType>) -> PyExpr {
// self.expr.cast_to() requires DFSchema to validate that the cast
// is supported, omit that for now
let expr = Expr::Cast(Cast::new(Box::new(self.expr.clone()), to.0));
expr.into()
}
}
/// Initializes the `expr` module to match the pattern of `datafusion-expr` https://docs.rs/datafusion-expr/latest/datafusion_expr/
pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
// expressions
m.add_class::<PyExpr>()?;
m.add_class::<PyColumn>()?;
m.add_class::<PyLiteral>()?;
m.add_class::<PyBinaryExpr>()?;
m.add_class::<PyLiteral>()?;
m.add_class::<PyAggregateFunction>()?;
// operators
m.add_class::<table_scan::PyTableScan>()?;
m.add_class::<projection::PyProjection>()?;
m.add_class::<filter::PyFilter>()?;
m.add_class::<limit::PyLimit>()?;
m.add_class::<aggregate::PyAggregate>()?;
m.add_class::<sort::PySort>()?;
m.add_class::<analyze::PyAnalyze>()?;
Ok(())
}