Add bindings for Projection (#180)
* Add bindings for Projection
* Address review comments
* cargo fmt
* datafusion_common::DFField bindings
* Revert "datafusion_common::DFField bindings"
This reverts commit 752fe1c124cfe8c087ac0f13b0015a2e5eaa8d6b.
diff --git a/datafusion/__init__.py b/datafusion/__init__.py
index 6784eea..4667835 100644
--- a/datafusion/__init__.py
+++ b/datafusion/__init__.py
@@ -41,6 +41,7 @@
from .expr import (
Expr,
+ Projection,
TableScan,
)
@@ -58,6 +59,7 @@
"column",
"literal",
"TableScan",
+ "Projection",
"DFSchema",
]
diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py
index 66db592..7bdbd83 100644
--- a/datafusion/tests/test_imports.py
+++ b/datafusion/tests/test_imports.py
@@ -32,6 +32,7 @@
from datafusion.expr import (
Expr,
+ Projection,
TableScan,
)
@@ -53,12 +54,12 @@
]:
assert klass.__module__ == "datafusion"
+ for klass in [Expr, Projection, TableScan]:
+ assert klass.__module__ == "datafusion.expr"
+
for klass in [DFSchema]:
assert klass.__module__ == "datafusion.common"
- for klass in [Expr, TableScan]:
- assert klass.__module__ == "datafusion.expr"
-
def test_import_from_functions_submodule():
from datafusion.functions import abs, sin # noqa
diff --git a/src/expr.rs b/src/expr.rs
index dceedc1..f3695fe 100644
--- a/src/expr.rs
+++ b/src/expr.rs
@@ -24,6 +24,8 @@
use datafusion::scalar::ScalarValue;
+pub mod logical_node;
+pub mod projection;
pub mod table_scan;
/// A PyExpr that can be used on a DataFrame
@@ -140,5 +142,6 @@
pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_class::<PyExpr>()?;
m.add_class::<table_scan::PyTableScan>()?;
+ m.add_class::<projection::PyProjection>()?;
Ok(())
}
diff --git a/src/expr/logical_node.rs b/src/expr/logical_node.rs
new file mode 100644
index 0000000..1bb3fa7
--- /dev/null
+++ b/src/expr/logical_node.rs
@@ -0,0 +1,25 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::sql::logical::PyLogicalPlan;
+
+/// Representation of a `LogicalNode` in the in overall `LogicalPlan`
+/// any "node" shares these common traits in common.
+pub trait LogicalNode {
+ /// The input plan to the current logical node instance.
+ fn input(&self) -> Vec<PyLogicalPlan>;
+}
diff --git a/src/expr/projection.rs b/src/expr/projection.rs
new file mode 100644
index 0000000..6d04e59
--- /dev/null
+++ b/src/expr/projection.rs
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::DataFusionError;
+use datafusion_expr::logical_plan::Projection;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::errors::py_runtime_err;
+use crate::expr::logical_node::LogicalNode;
+use crate::expr::PyExpr;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Projection", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyProjection {
+ projection: Projection,
+}
+
+impl From<Projection> for PyProjection {
+ fn from(projection: Projection) -> PyProjection {
+ PyProjection { projection }
+ }
+}
+
+impl TryFrom<PyProjection> for Projection {
+ type Error = DataFusionError;
+
+ fn try_from(py_proj: PyProjection) -> Result<Self, Self::Error> {
+ Projection::try_new_with_schema(
+ py_proj.projection.expr,
+ py_proj.projection.input.clone(),
+ py_proj.projection.schema,
+ )
+ }
+}
+
+impl Display for PyProjection {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(
+ f,
+ "Projection
+ \nExpr(s): {:?}
+ \nInput: {:?}
+ \nProjected Schema: {:?}",
+ &self.projection.expr, &self.projection.input, &self.projection.schema,
+ )
+ }
+}
+
+#[pymethods]
+impl PyProjection {
+ /// Retrieves the expressions for this `Projection`
+ #[pyo3(name = "projections")]
+ fn py_projections(&self) -> PyResult<Vec<PyExpr>> {
+ Ok(self
+ .projection
+ .expr
+ .iter()
+ .map(|e| PyExpr::from(e.clone()))
+ .collect())
+ }
+
+ // Retrieves the input `LogicalPlan` to this `Projection` node
+ #[pyo3(name = "input")]
+ fn py_input(&self) -> PyResult<PyLogicalPlan> {
+ // DataFusion make a loose guarantee that each Projection should have an input, however
+ // we check for that hear since we are performing explicit index retrieval
+ let inputs = LogicalNode::input(self);
+ if !inputs.is_empty() {
+ return Ok(inputs[0].clone());
+ }
+
+ Err(py_runtime_err(format!(
+ "Expected `input` field for Projection node: {}",
+ self
+ )))
+ }
+
+ // Resulting Schema for this `Projection` node instance
+ #[pyo3(name = "schema")]
+ fn py_schema(&self) -> PyResult<PyDFSchema> {
+ Ok((*self.projection.schema).clone().into())
+ }
+
+ fn __repr__(&self) -> PyResult<String> {
+ Ok(format!("Projection({})", self))
+ }
+}
+
+impl LogicalNode for PyProjection {
+ fn input(&self) -> Vec<PyLogicalPlan> {
+ vec![PyLogicalPlan::from((*self.projection.input).clone())]
+ }
+}
diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs
index bc7d68a..00504b9 100644
--- a/src/expr/table_scan.rs
+++ b/src/expr/table_scan.rs
@@ -19,7 +19,7 @@
use pyo3::prelude::*;
use std::fmt::{self, Display, Formatter};
-use crate::expr::PyExpr;
+use crate::{common::df_schema::PyDFSchema, expr::PyExpr};
#[pyclass(name = "TableScan", module = "datafusion.expr", subclass)]
#[derive(Clone)]
@@ -49,8 +49,8 @@
\nFilters: {:?}",
&self.table_scan.table_name,
&self.py_projections(),
- self.table_scan.projected_schema,
- self.py_filters(),
+ &self.py_schema(),
+ &self.py_filters(),
)
}
}
@@ -89,13 +89,11 @@
}
}
- /// TODO: Bindings for `DFSchema` need to exist first. Left as a
- /// placeholder to display intention to add when able to.
- // /// Resulting schema from the `TableScan` operation
- // #[pyo3(name = "projectedSchema")]
- // fn py_projected_schema(&self) -> PyResult<DFSchemaRef> {
- // Ok(self.table_scan.projected_schema)
- // }
+ /// Resulting schema from the `TableScan` operation
+ #[pyo3(name = "schema")]
+ fn py_schema(&self) -> PyResult<PyDFSchema> {
+ Ok((*self.table_scan.projected_schema).clone().into())
+ }
/// Certain `TableProvider` physical readers offer the capability to filter rows that
/// are read at read time. These `filters` are contained here.