[branch-0.8] Merge from main (#201)
* changelog (#188)
* Add Python wrapper for LogicalPlan::Sort (#196)
* Add Python wrapper for LogicalPlan::Aggregate (#195)
* Add Python wrapper for LogicalPlan::Limit (#193)
* Add Python wrapper for LogicalPlan::Filter (#192)
* Add Python wrapper for LogicalPlan::Filter
* clippy
* clippy
* Update src/expr/filter.rs
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
---------
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
* Add tests for recently added functionality (#199)
* Add experimental support for executing SQL with Polars and Pandas (#190)
* Run `maturin develop` instead of `cargo build` in verification script (#200)
* Implement `to_pandas()` (#197)
* Implement to_pandas()
* Update documentation
* Write unit test
* Add support for cudf as a physical execution engine (#205)
* Update README in preparation for 0.8 release (#206)
* Analyze table bindings (#204)
* method for getting the internal LogicalPlan instance
* Add explain plan method
* Add bindings for analyze table
* Add to_variant
* cargo fmt
* blake and flake formatting
* changelog (#209)
---------
Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Dejan Simic <10134699+simicd@users.noreply.github.com>
Co-authored-by: Jeremy Dyer <jdye64@gmail.com>
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8d47fdf..f196916 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -19,50 +19,36 @@
# Changelog
-## [0.8.0](https://github.com/apache/arrow-datafusion-python/tree/0.8.0) (2023-02-17)
+## [0.8.0](https://github.com/apache/arrow-datafusion-python/tree/0.8.0) (2023-02-22)
-[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0...0.8.0)
+[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.8.0-rc1...0.8.0)
**Implemented enhancements:**
-- Add bindings for datafusion\_common::DFField [\#184](https://github.com/apache/arrow-datafusion-python/issues/184)
-- Add bindings for DFSchema/DFSchemaRef [\#181](https://github.com/apache/arrow-datafusion-python/issues/181)
-- Add bindings for datafusion\_expr Projection [\#179](https://github.com/apache/arrow-datafusion-python/issues/179)
-- Add bindings for `TableScan` struct from `datafusion_expr::TableScan` [\#177](https://github.com/apache/arrow-datafusion-python/issues/177)
-- Add a "mapping" struct for types [\#172](https://github.com/apache/arrow-datafusion-python/issues/172)
-- Improve string representation of datafusion classes \(dataframe, context, expression, ...\) [\#158](https://github.com/apache/arrow-datafusion-python/issues/158)
-- Add DataFrame count method [\#151](https://github.com/apache/arrow-datafusion-python/issues/151)
-- \[REQUEST\] Github Actions Improvements [\#146](https://github.com/apache/arrow-datafusion-python/issues/146)
-- Change default branch name from master to main [\#144](https://github.com/apache/arrow-datafusion-python/issues/144)
-- Bump pyo3 to 0.18.0 [\#140](https://github.com/apache/arrow-datafusion-python/issues/140)
-- Add script for Python linting [\#134](https://github.com/apache/arrow-datafusion-python/issues/134)
-- Add Python bindings for substrait module [\#132](https://github.com/apache/arrow-datafusion-python/issues/132)
-- Expand unit tests for built-in functions [\#128](https://github.com/apache/arrow-datafusion-python/issues/128)
-- support creating arrow-datafusion-python conda environment [\#122](https://github.com/apache/arrow-datafusion-python/issues/122)
-- Build Python source distribution in GitHub workflow [\#81](https://github.com/apache/arrow-datafusion-python/issues/81)
-- EPIC: Add all functions to python binding `functions` [\#72](https://github.com/apache/arrow-datafusion-python/issues/72)
+- Add support for cuDF physical execution engine [\#202](https://github.com/apache/arrow-datafusion-python/issues/202)
+- Make it easier to create a Pandas dataframe from DataFusion query results [\#139](https://github.com/apache/arrow-datafusion-python/issues/139)
**Fixed bugs:**
-- Build is broken [\#161](https://github.com/apache/arrow-datafusion-python/issues/161)
-- Out of memory when sorting [\#157](https://github.com/apache/arrow-datafusion-python/issues/157)
-- window\_lead test appears to be non-deterministic [\#135](https://github.com/apache/arrow-datafusion-python/issues/135)
-- Reading csv does not work [\#130](https://github.com/apache/arrow-datafusion-python/issues/130)
-- Github actions produce a lot of warnings [\#94](https://github.com/apache/arrow-datafusion-python/issues/94)
-- ASF source release tarball has wrong directory name [\#90](https://github.com/apache/arrow-datafusion-python/issues/90)
-- Python Release Build failing after upgrading to maturin 14.2 [\#87](https://github.com/apache/arrow-datafusion-python/issues/87)
-- Maturin build hangs on Linux ARM64 [\#84](https://github.com/apache/arrow-datafusion-python/issues/84)
-- Cannot install on Mac M1 from source tarball from testpypi [\#82](https://github.com/apache/arrow-datafusion-python/issues/82)
-- ImportPathMismatchError when running pytest locally [\#77](https://github.com/apache/arrow-datafusion-python/issues/77)
+- Build error: could not compile `thiserror` due to 2 previous errors [\#69](https://github.com/apache/arrow-datafusion-python/issues/69)
**Closed issues:**
-- Publish documentation for Python bindings [\#39](https://github.com/apache/arrow-datafusion-python/issues/39)
-- Add Python binding for `approx_median` [\#32](https://github.com/apache/arrow-datafusion-python/issues/32)
-- Release version 0.7.0 [\#7](https://github.com/apache/arrow-datafusion-python/issues/7)
+- Integrate with the new `object_store` crate [\#22](https://github.com/apache/arrow-datafusion-python/issues/22)
**Merged pull requests:**
+- Update README in preparation for 0.8 release [\#206](https://github.com/apache/arrow-datafusion-python/pull/206) ([andygrove](https://github.com/andygrove))
+- Add support for cudf as a physical execution engine [\#205](https://github.com/apache/arrow-datafusion-python/pull/205) ([jdye64](https://github.com/jdye64))
+- Run `maturin develop` instead of `cargo build` in verification script [\#200](https://github.com/apache/arrow-datafusion-python/pull/200) ([andygrove](https://github.com/andygrove))
+- Add tests for recently added functionality [\#199](https://github.com/apache/arrow-datafusion-python/pull/199) ([andygrove](https://github.com/andygrove))
+- Implement `to_pandas()` [\#197](https://github.com/apache/arrow-datafusion-python/pull/197) ([simicd](https://github.com/simicd))
+- Add Python wrapper for LogicalPlan::Sort [\#196](https://github.com/apache/arrow-datafusion-python/pull/196) ([andygrove](https://github.com/andygrove))
+- Add Python wrapper for LogicalPlan::Aggregate [\#195](https://github.com/apache/arrow-datafusion-python/pull/195) ([andygrove](https://github.com/andygrove))
+- Add Python wrapper for LogicalPlan::Limit [\#193](https://github.com/apache/arrow-datafusion-python/pull/193) ([andygrove](https://github.com/andygrove))
+- Add Python wrapper for LogicalPlan::Filter [\#192](https://github.com/apache/arrow-datafusion-python/pull/192) ([andygrove](https://github.com/andygrove))
+- Add experimental support for executing SQL with Polars and Pandas [\#190](https://github.com/apache/arrow-datafusion-python/pull/190) ([andygrove](https://github.com/andygrove))
+- Update changelog for 0.8 release [\#188](https://github.com/apache/arrow-datafusion-python/pull/188) ([andygrove](https://github.com/andygrove))
- Add ability to execute ExecutionPlan and get a stream of RecordBatch [\#186](https://github.com/apache/arrow-datafusion-python/pull/186) ([andygrove](https://github.com/andygrove))
- Dffield bindings [\#185](https://github.com/apache/arrow-datafusion-python/pull/185) ([jdye64](https://github.com/jdye64))
- Add bindings for DFSchema [\#183](https://github.com/apache/arrow-datafusion-python/pull/183) ([jdye64](https://github.com/jdye64))
@@ -118,6 +104,52 @@
- Update release instructions [\#83](https://github.com/apache/arrow-datafusion-python/pull/83) ([andygrove](https://github.com/andygrove))
- \[Functions\] - Add python function binding to `functions` [\#73](https://github.com/apache/arrow-datafusion-python/pull/73) ([francis-du](https://github.com/francis-du))
+## [0.8.0-rc1](https://github.com/apache/arrow-datafusion-python/tree/0.8.0-rc1) (2023-02-17)
+
+[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0-rc2...0.8.0-rc1)
+
+**Implemented enhancements:**
+
+- Add bindings for datafusion\_common::DFField [\#184](https://github.com/apache/arrow-datafusion-python/issues/184)
+- Add bindings for DFSchema/DFSchemaRef [\#181](https://github.com/apache/arrow-datafusion-python/issues/181)
+- Add bindings for datafusion\_expr Projection [\#179](https://github.com/apache/arrow-datafusion-python/issues/179)
+- Add bindings for `TableScan` struct from `datafusion_expr::TableScan` [\#177](https://github.com/apache/arrow-datafusion-python/issues/177)
+- Add a "mapping" struct for types [\#172](https://github.com/apache/arrow-datafusion-python/issues/172)
+- Improve string representation of datafusion classes \(dataframe, context, expression, ...\) [\#158](https://github.com/apache/arrow-datafusion-python/issues/158)
+- Add DataFrame count method [\#151](https://github.com/apache/arrow-datafusion-python/issues/151)
+- \[REQUEST\] Github Actions Improvements [\#146](https://github.com/apache/arrow-datafusion-python/issues/146)
+- Change default branch name from master to main [\#144](https://github.com/apache/arrow-datafusion-python/issues/144)
+- Bump pyo3 to 0.18.0 [\#140](https://github.com/apache/arrow-datafusion-python/issues/140)
+- Add script for Python linting [\#134](https://github.com/apache/arrow-datafusion-python/issues/134)
+- Add Python bindings for substrait module [\#132](https://github.com/apache/arrow-datafusion-python/issues/132)
+- Expand unit tests for built-in functions [\#128](https://github.com/apache/arrow-datafusion-python/issues/128)
+- support creating arrow-datafusion-python conda environment [\#122](https://github.com/apache/arrow-datafusion-python/issues/122)
+- Build Python source distribution in GitHub workflow [\#81](https://github.com/apache/arrow-datafusion-python/issues/81)
+- EPIC: Add all functions to python binding `functions` [\#72](https://github.com/apache/arrow-datafusion-python/issues/72)
+
+**Fixed bugs:**
+
+- Build is broken [\#161](https://github.com/apache/arrow-datafusion-python/issues/161)
+- Out of memory when sorting [\#157](https://github.com/apache/arrow-datafusion-python/issues/157)
+- window\_lead test appears to be non-deterministic [\#135](https://github.com/apache/arrow-datafusion-python/issues/135)
+- Reading csv does not work [\#130](https://github.com/apache/arrow-datafusion-python/issues/130)
+- Github actions produce a lot of warnings [\#94](https://github.com/apache/arrow-datafusion-python/issues/94)
+- ASF source release tarball has wrong directory name [\#90](https://github.com/apache/arrow-datafusion-python/issues/90)
+- Python Release Build failing after upgrading to maturin 14.2 [\#87](https://github.com/apache/arrow-datafusion-python/issues/87)
+- Maturin build hangs on Linux ARM64 [\#84](https://github.com/apache/arrow-datafusion-python/issues/84)
+- Cannot install on Mac M1 from source tarball from testpypi [\#82](https://github.com/apache/arrow-datafusion-python/issues/82)
+- ImportPathMismatchError when running pytest locally [\#77](https://github.com/apache/arrow-datafusion-python/issues/77)
+
+**Closed issues:**
+
+- Publish documentation for Python bindings [\#39](https://github.com/apache/arrow-datafusion-python/issues/39)
+- Add Python binding for `approx_median` [\#32](https://github.com/apache/arrow-datafusion-python/issues/32)
+- Release version 0.7.0 [\#7](https://github.com/apache/arrow-datafusion-python/issues/7)
+
+## [0.7.0-rc2](https://github.com/apache/arrow-datafusion-python/tree/0.7.0-rc2) (2022-11-26)
+
+[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0...0.7.0-rc2)
+
## [Unreleased](https://github.com/datafusion-contrib/datafusion-python/tree/HEAD)
diff --git a/Cargo.lock b/Cargo.lock
index 5059afa..04a2ea8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1134,9 +1134,9 @@
[[package]]
name = "http"
-version = "0.2.8"
+version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
+checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
dependencies = [
"bytes",
"fnv",
@@ -1385,9 +1385,9 @@
[[package]]
name = "libflate"
-version = "1.2.0"
+version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "05605ab2bce11bcfc0e9c635ff29ef8b2ea83f29be257ee7d730cac3ee373093"
+checksum = "97822bf791bd4d5b403713886a5fbe8bf49520fe78e323b0dc480ca1a03e50b0"
dependencies = [
"adler32",
"crc32fast",
@@ -1396,9 +1396,9 @@
[[package]]
name = "libflate_lz77"
-version = "1.1.0"
+version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "39a734c0493409afcd49deee13c006a04e3586b9761a03543c6272c9c51f2f5a"
+checksum = "a52d3a8bfc85f250440e4424db7d857e241a3aebbbe301f3eb606ab15c39acbf"
dependencies = [
"rle-decode-fast",
]
@@ -2359,9 +2359,9 @@
[[package]]
name = "slab"
-version = "0.4.7"
+version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
+checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d"
dependencies = [
"autocfg",
]
@@ -2635,9 +2635,9 @@
[[package]]
name = "tokio-stream"
-version = "0.1.11"
+version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
+checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313"
dependencies = [
"futures-core",
"pin-project-lite",
diff --git a/README.md b/README.md
index ab89ff6..e78f613 100644
--- a/README.md
+++ b/README.md
@@ -24,19 +24,30 @@
This is a Python library that binds to [Apache Arrow](https://arrow.apache.org/) in-memory query engine [DataFusion](https://github.com/apache/arrow-datafusion).
-Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV
-files, run it in a multi-threaded environment, and obtain the result back in Python.
+DataFusion's Python bindings can be used as an end-user tool as well as providing a foundation for building new systems.
-It also allows you to use UDFs and UDAFs for complex operations.
+## Features
-The major advantage of this library over other execution engines is that this library achieves zero-copy between
-Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart
-from having to lock the GIL when running those operations.
+- Execute queries using SQL or DataFrames against CSV, Parquet, and JSON data sources
+- Queries are optimized using DataFusion's query optimizer
+- Execute user-defined Python code from SQL
+- Exchange data with Pandas and other DataFrame libraries that support PyArrow
+- Serialize and deserialize query plans in Substrait format
+- Experimental support for executing SQL queries against Polars, Pandas and cuDF
-Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions
-about thread safety and lack of memory leaks.
+## Comparison with other projects
-Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html).
+Here is a comparison with similar projects that may help understand when DataFusion might be suitable and unsuitable
+for your needs:
+
+- [DuckDB](http://www.duckdb.org/) is an open source, in-process analytic database. Like DataFusion, it supports
+ very fast execution, both from its custom file format and directly from Parquet files. Unlike DataFusion, it is
+ written in C/C++ and it is primarily used directly by users as a serverless database and query system rather than
+ as a library for building such database systems.
+
+- [Polars](http://pola.rs/) is one of the fastest DataFrame libraries at the time of writing. Like DataFusion, it
+ is also written in Rust and uses the Apache Arrow memory model, but unlike DataFusion it does not provide full SQL
+ support, nor as many extension points.
## Example Usage
@@ -47,12 +58,8 @@
- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
-See the [examples](examples) directory for more examples.
-
```python
from datafusion import SessionContext
-import pandas as pd
-import pyarrow as pa
# Create a DataFusion context
ctx = SessionContext()
@@ -67,17 +74,11 @@
"group by passenger_count "
"order by passenger_count")
-# collect as list of pyarrow.RecordBatch
-results = df.collect()
-
-# get first batch
-batch = results[0]
-
# convert to Pandas
-df = batch.to_pandas()
+pandas_df = df.to_pandas()
# create a chart
-fig = df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure()
+fig = pandas_df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure()
fig.savefig('chart.png')
```
@@ -85,42 +86,30 @@

-## Substrait Support
+## More Examples
-`arrow-datafusion-python` has bindings which allow for serializing a SQL query to substrait protobuf format and deserializing substrait protobuf bytes to a DataFusion `LogicalPlan`, `PyLogicalPlan` in a Python context, which can then be executed.
+See [examples](examples/README.md) for more information.
-### Example of Serializing/Deserializing Substrait Plans
+### Executing Queries with DataFusion
-```python
-from datafusion import SessionContext
-from datafusion import substrait as ss
+- [Query a Parquet file using SQL](./examples/sql-parquet.py)
+- [Query a Parquet file using the DataFrame API](./examples/dataframe-parquet.py)
+- [Run a SQL query and store the results in a Pandas DataFrame](./examples/sql-to-pandas.py)
+- [Query PyArrow Data](./examples/query-pyarrow-data.py)
-# Create a DataFusion context
-ctx = SessionContext()
+### Running User-Defined Python Code
-# Register table with context
-ctx.register_parquet('aggregate_test_data', './testing/data/csv/aggregate_test_100.csv')
+- [Register a Python UDF with DataFusion](./examples/python-udf.py)
+- [Register a Python UDAF with DataFusion](./examples/python-udaf.py)
-substrait_plan = ss.substrait.serde.serialize_to_plan("SELECT * FROM aggregate_test_data", ctx)
-# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
+### Substrait Support
-# Alternative serialization approaches
-# type(substrait_bytes) -> <class 'list'>, at this point the bytes can be distributed to file, network, etc safely
-# where they could subsequently be deserialized on the receiving end.
-substrait_bytes = ss.substrait.serde.serialize_bytes("SELECT * FROM aggregate_test_data", ctx)
+- [Serialize query plans using Substrait](./examples/substrait.py)
-# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused
-# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
-substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes)
+### Executing SQL against DataFrame Libraries (Experimental)
-# type(df_logical_plan) -> <class 'substrait.LogicalPlan'>
-df_logical_plan = ss.substrait.consumer.from_substrait_plan(ctx, substrait_plan)
-
-# Back to Substrait Plan just for demonstration purposes
-# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
-substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan)
-
-```
+- [Executing SQL on Polars](./examples/sql-on-polars.py)
+- [Executing SQL on Pandas](./examples/sql-on-pandas.py)
## How to install (from pip)
diff --git a/conda/environments/datafusion-dev.yaml b/conda/environments/datafusion-dev.yaml
index 0e17e16..d9405e4 100644
--- a/conda/environments/datafusion-dev.yaml
+++ b/conda/environments/datafusion-dev.yaml
@@ -28,7 +28,7 @@
- pytest
- toml
- importlib_metadata
-- python>=3.7,<3.11
+- python>=3.10
# Packages useful for building distributions and releasing
- mamba
- conda-build
@@ -38,4 +38,7 @@
- pydata-sphinx-theme==0.8.0
- myst-parser
- jinja2
+# GPU packages
+- cudf
+- cudatoolkit=11.8
name: datafusion-dev
diff --git a/datafusion/__init__.py b/datafusion/__init__.py
index b6cd517..46206f0 100644
--- a/datafusion/__init__.py
+++ b/datafusion/__init__.py
@@ -41,8 +41,12 @@
)
from .expr import (
+ Analyze,
Expr,
+ Filter,
+ Limit,
Projection,
+ Sort,
TableScan,
)
@@ -63,6 +67,10 @@
"Projection",
"DFSchema",
"DFField",
+ "Analyze",
+ "Sort",
+ "Limit",
+ "Filter",
]
diff --git a/datafusion/cudf.py b/datafusion/cudf.py
new file mode 100644
index 0000000..c38819c
--- /dev/null
+++ b/datafusion/cudf.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import cudf
+import datafusion
+from datafusion.expr import Projection, TableScan, Column
+
+
+class SessionContext:
+ def __init__(self):
+ self.datafusion_ctx = datafusion.SessionContext()
+ self.parquet_tables = {}
+
+ def register_parquet(self, name, path):
+ self.parquet_tables[name] = path
+ self.datafusion_ctx.register_parquet(name, path)
+
+ def to_cudf_expr(self, expr):
+
+ # get Python wrapper for logical expression
+ expr = expr.to_variant()
+
+ if isinstance(expr, Column):
+ return expr.name()
+ else:
+ raise Exception("unsupported expression: {}".format(expr))
+
+ def to_cudf_df(self, plan):
+ # recurse down first to translate inputs into pandas data frames
+ inputs = [self.to_cudf_df(x) for x in plan.inputs()]
+
+ # get Python wrapper for logical operator node
+ node = plan.to_variant()
+
+ if isinstance(node, Projection):
+ args = [self.to_cudf_expr(expr) for expr in node.projections()]
+ return inputs[0][args]
+ elif isinstance(node, TableScan):
+ return cudf.read_parquet(self.parquet_tables[node.table_name()])
+ else:
+ raise Exception(
+ "unsupported logical operator: {}".format(type(node))
+ )
+
+ def sql(self, sql):
+ datafusion_df = self.datafusion_ctx.sql(sql)
+ plan = datafusion_df.logical_plan()
+ return self.to_cudf_df(plan)
diff --git a/datafusion/pandas.py b/datafusion/pandas.py
new file mode 100644
index 0000000..f8e5651
--- /dev/null
+++ b/datafusion/pandas.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pandas as pd
+import datafusion
+from datafusion.expr import Projection, TableScan, Column
+
+
+class SessionContext:
+ def __init__(self):
+ self.datafusion_ctx = datafusion.SessionContext()
+ self.parquet_tables = {}
+
+ def register_parquet(self, name, path):
+ self.parquet_tables[name] = path
+ self.datafusion_ctx.register_parquet(name, path)
+
+ def to_pandas_expr(self, expr):
+ # get Python wrapper for logical expression
+ expr = expr.to_variant()
+
+ if isinstance(expr, Column):
+ return expr.name()
+ else:
+ raise Exception("unsupported expression: {}".format(expr))
+
+ def to_pandas_df(self, plan):
+ # recurse down first to translate inputs into pandas data frames
+ inputs = [self.to_pandas_df(x) for x in plan.inputs()]
+
+ # get Python wrapper for logical operator node
+ node = plan.to_variant()
+
+ if isinstance(node, Projection):
+ args = [self.to_pandas_expr(expr) for expr in node.projections()]
+ return inputs[0][args]
+ elif isinstance(node, TableScan):
+ return pd.read_parquet(self.parquet_tables[node.table_name()])
+ else:
+ raise Exception(
+ "unsupported logical operator: {}".format(type(node))
+ )
+
+ def sql(self, sql):
+ datafusion_df = self.datafusion_ctx.sql(sql)
+ plan = datafusion_df.logical_plan()
+ return self.to_pandas_df(plan)
diff --git a/datafusion/polars.py b/datafusion/polars.py
new file mode 100644
index 0000000..a1bafbe
--- /dev/null
+++ b/datafusion/polars.py
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import polars
+import datafusion
+from datafusion.expr import Projection, TableScan, Aggregate
+from datafusion.expr import Column, AggregateFunction
+
+
+class SessionContext:
+ def __init__(self):
+ self.datafusion_ctx = datafusion.SessionContext()
+ self.parquet_tables = {}
+
+ def register_parquet(self, name, path):
+ self.parquet_tables[name] = path
+ self.datafusion_ctx.register_parquet(name, path)
+
+ def to_polars_expr(self, expr):
+ # get Python wrapper for logical expression
+ expr = expr.to_variant()
+
+ if isinstance(expr, Column):
+ return polars.col(expr.name())
+ else:
+ raise Exception("unsupported expression: {}".format(expr))
+
+ def to_polars_df(self, plan):
+ # recurse down first to translate inputs into Polars data frames
+ inputs = [self.to_polars_df(x) for x in plan.inputs()]
+
+ # get Python wrapper for logical operator node
+ node = plan.to_variant()
+
+ if isinstance(node, Projection):
+ args = [self.to_polars_expr(expr) for expr in node.projections()]
+ return inputs[0].select(*args)
+ elif isinstance(node, Aggregate):
+ groupby_expr = [
+ self.to_polars_expr(expr) for expr in node.group_by_exprs()
+ ]
+ aggs = []
+ for expr in node.aggregate_exprs():
+ expr = expr.to_variant()
+ if isinstance(expr, AggregateFunction):
+ if expr.aggregate_type() == "COUNT":
+ aggs.append(polars.count().alias("{}".format(expr)))
+ else:
+ raise Exception(
+ "Unsupported aggregate function {}".format(
+ expr.aggregate_type()
+ )
+ )
+ else:
+ raise Exception(
+ "Unsupported aggregate function {}".format(expr)
+ )
+ df = inputs[0].groupby(groupby_expr).agg(aggs)
+ return df
+ elif isinstance(node, TableScan):
+ return polars.read_parquet(self.parquet_tables[node.table_name()])
+ else:
+ raise Exception(
+ "unsupported logical operator: {}".format(type(node))
+ )
+
+ def sql(self, sql):
+ datafusion_df = self.datafusion_ctx.sql(sql)
+ plan = datafusion_df.logical_plan()
+ return self.to_polars_df(plan)
diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py
index 6faffaf..efa2ede 100644
--- a/datafusion/tests/test_context.py
+++ b/datafusion/tests/test_context.py
@@ -35,7 +35,6 @@
def test_create_context_with_all_valid_args():
-
runtime = (
RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
)
diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py
index 1894688..292a4b0 100644
--- a/datafusion/tests/test_dataframe.py
+++ b/datafusion/tests/test_dataframe.py
@@ -533,3 +533,14 @@
def test_count(df):
# Get number of rows
assert df.count() == 3
+
+
+def test_to_pandas(df):
+ # Skip test if pandas is not installed
+ pd = pytest.importorskip("pandas")
+
+ # Convert datafusion dataframe to pandas dataframe
+ pandas_df = df.to_pandas()
+ assert type(pandas_df) == pd.DataFrame
+ assert pandas_df.shape == (3, 3)
+ assert set(pandas_df.columns) == {"a", "b", "c"}
diff --git a/datafusion/tests/test_expr.py b/datafusion/tests/test_expr.py
new file mode 100644
index 0000000..143eea6
--- /dev/null
+++ b/datafusion/tests/test_expr.py
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import SessionContext
+from datafusion.expr import Column, Literal, BinaryExpr, AggregateFunction
+from datafusion.expr import (
+ Projection,
+ Filter,
+ Aggregate,
+ Limit,
+ Sort,
+ TableScan,
+)
+import pytest
+
+
+@pytest.fixture
+def test_ctx():
+ ctx = SessionContext()
+ ctx.register_csv("test", "testing/data/csv/aggregate_test_100.csv")
+ return ctx
+
+
+def test_projection(test_ctx):
+ df = test_ctx.sql("select c1, 123, c1 < 123 from test")
+ plan = df.logical_plan()
+
+ plan = plan.to_variant()
+ assert isinstance(plan, Projection)
+
+ expr = plan.projections()
+
+ col1 = expr[0].to_variant()
+ assert isinstance(col1, Column)
+ assert col1.name() == "c1"
+ assert col1.qualified_name() == "test.c1"
+
+ col2 = expr[1].to_variant()
+ assert isinstance(col2, Literal)
+ assert col2.data_type() == "Int64"
+ assert col2.value_i64() == 123
+
+ col3 = expr[2].to_variant()
+ assert isinstance(col3, BinaryExpr)
+ assert isinstance(col3.left().to_variant(), Column)
+ assert col3.op() == "<"
+ assert isinstance(col3.right().to_variant(), Literal)
+
+ plan = plan.input().to_variant()
+ assert isinstance(plan, TableScan)
+
+
+def test_filter(test_ctx):
+ df = test_ctx.sql("select c1 from test WHERE c1 > 5")
+ plan = df.logical_plan()
+
+ plan = plan.to_variant()
+ assert isinstance(plan, Projection)
+
+ plan = plan.input().to_variant()
+ assert isinstance(plan, Filter)
+
+
+def test_limit(test_ctx):
+ df = test_ctx.sql("select c1 from test LIMIT 10")
+ plan = df.logical_plan()
+
+ plan = plan.to_variant()
+ assert isinstance(plan, Limit)
+
+
+def test_aggregate_query(test_ctx):
+ df = test_ctx.sql("select c1, count(*) from test group by c1")
+ plan = df.logical_plan()
+
+ projection = plan.to_variant()
+ assert isinstance(projection, Projection)
+
+ aggregate = projection.input().to_variant()
+ assert isinstance(aggregate, Aggregate)
+
+ col1 = aggregate.group_by_exprs()[0].to_variant()
+ assert isinstance(col1, Column)
+ assert col1.name() == "c1"
+ assert col1.qualified_name() == "test.c1"
+
+ col2 = aggregate.aggregate_exprs()[0].to_variant()
+ assert isinstance(col2, AggregateFunction)
+
+
+def test_sort(test_ctx):
+ df = test_ctx.sql("select c1 from test order by c1")
+ plan = df.logical_plan()
+
+ plan = plan.to_variant()
+ assert isinstance(plan, Sort)
diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py
index e5d9585..7eb8b7c 100644
--- a/datafusion/tests/test_imports.py
+++ b/datafusion/tests/test_imports.py
@@ -33,8 +33,17 @@
from datafusion.expr import (
Expr,
+ Column,
+ Literal,
+ BinaryExpr,
+ AggregateFunction,
Projection,
TableScan,
+ Filter,
+ Limit,
+ Aggregate,
+ Sort,
+ Analyze,
)
@@ -55,9 +64,23 @@
]:
assert klass.__module__ == "datafusion"
- for klass in [Expr, Projection, TableScan]:
+ # expressions
+ for klass in [Expr, Column, Literal, BinaryExpr, AggregateFunction]:
assert klass.__module__ == "datafusion.expr"
+ # operators
+ for klass in [
+ Projection,
+ TableScan,
+ Aggregate,
+ Sort,
+ Limit,
+ Filter,
+ Analyze,
+ ]:
+ assert klass.__module__ == "datafusion.expr"
+
+ # schema
for klass in [DFField, DFSchema]:
assert klass.__module__ == "datafusion.common"
diff --git a/dev/release/verify-release-candidate.sh b/dev/release/verify-release-candidate.sh
index fee276c..be86f69 100755
--- a/dev/release/verify-release-candidate.sh
+++ b/dev/release/verify-release-candidate.sh
@@ -125,15 +125,19 @@
git clone https://github.com/apache/arrow-testing.git testing
git clone https://github.com/apache/parquet-testing.git parquet-testing
- cargo build
- cargo test --all
+ python3 -m venv venv
+ source venv/bin/activate
+ python3 -m pip install -U pip
+ python3 -m pip install -r requirements-310.txt
+ maturin develop
+
+ #TODO: we should really run tests here as well
+ #python3 -m pytest
if ( find -iname 'Cargo.toml' | xargs grep SNAPSHOT ); then
echo "Cargo.toml version should not contain SNAPSHOT for releases"
exit 1
fi
-
- cargo publish --dry-run
}
TEST_SUCCESS=no
diff --git a/examples/README.md b/examples/README.md
index a3ae0ba..ce98600 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -19,9 +19,31 @@
# DataFusion Python Examples
-- [Query a Parquet file using SQL](./sql-parquet.py)
-- [Query a Parquet file using the DataFrame API](./dataframe-parquet.py)
-- [Run a SQL query and store the results in a Pandas DataFrame](./sql-to-pandas.py)
-- [Query PyArrow Data](./query-pyarrow-data.py)
-- [Register a Python UDF with DataFusion](./python-udf.py)
-- [Register a Python UDAF with DataFusion](./python-udaf.py)
+Some examples rely on data which can be downloaded from the following site:
+
+- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
+
+Here is a direct link to the file used in the examples:
+
+- https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet
+
+### Executing Queries with DataFusion
+
+- [Query a Parquet file using SQL](./examples/sql-parquet.py)
+- [Query a Parquet file using the DataFrame API](./examples/dataframe-parquet.py)
+- [Run a SQL query and store the results in a Pandas DataFrame](./examples/sql-to-pandas.py)
+- [Query PyArrow Data](./examples/query-pyarrow-data.py)
+
+### Running User-Defined Python Code
+
+- [Register a Python UDF with DataFusion](./examples/python-udf.py)
+- [Register a Python UDAF with DataFusion](./examples/python-udaf.py)
+
+### Substrait Support
+
+- [Serialize query plans using Substrait](./examples/substrait.py)
+
+### Executing SQL against DataFrame Libraries (Experimental)
+
+- [Executing SQL on Polars](./examples/sql-on-polars.py)
+- [Executing SQL on Pandas](./examples/sql-on-pandas.py)
diff --git a/examples/dataframe-parquet.py b/examples/dataframe-parquet.py
index 31a8aa6..0f2e4b8 100644
--- a/examples/dataframe-parquet.py
+++ b/examples/dataframe-parquet.py
@@ -19,7 +19,7 @@
from datafusion import functions as f
ctx = SessionContext()
-df = ctx.read_parquet(
- "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
-).aggregate([f.col("passenger_count")], [f.count_star()])
+df = ctx.read_parquet("yellow_tripdata_2021-01.parquet").aggregate(
+ [f.col("passenger_count")], [f.count_star()]
+)
df.show()
diff --git a/examples/sql-on-cudf.py b/examples/sql-on-cudf.py
new file mode 100644
index 0000000..407cb1f
--- /dev/null
+++ b/examples/sql-on-cudf.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion.cudf import SessionContext
+
+
+ctx = SessionContext()
+ctx.register_parquet(
+ "taxi", "/home/jeremy/Downloads/yellow_tripdata_2021-01.parquet"
+)
+df = ctx.sql("select passenger_count from taxi")
+print(df)
diff --git a/examples/sql-on-pandas.py b/examples/sql-on-pandas.py
new file mode 100644
index 0000000..0efd776
--- /dev/null
+++ b/examples/sql-on-pandas.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion.pandas import SessionContext
+
+
+ctx = SessionContext()
+ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
+df = ctx.sql("select passenger_count from taxi")
+print(df)
diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py
new file mode 100644
index 0000000..c208114
--- /dev/null
+++ b/examples/sql-on-polars.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion.polars import SessionContext
+
+
+ctx = SessionContext()
+ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
+df = ctx.sql(
+ "select passenger_count, count(*) from taxi group by passenger_count"
+)
+print(df)
diff --git a/examples/sql-parquet.py b/examples/sql-parquet.py
index 7b2db6f..3cc9fbd 100644
--- a/examples/sql-parquet.py
+++ b/examples/sql-parquet.py
@@ -18,9 +18,7 @@
from datafusion import SessionContext
ctx = SessionContext()
-ctx.register_parquet(
- "taxi", "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
-)
+ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
df = ctx.sql(
"select passenger_count, count(*) from taxi where passenger_count is not null group by passenger_count order by passenger_count"
)
diff --git a/examples/sql-to-pandas.py b/examples/sql-to-pandas.py
index 3569e6d..3e99b22 100644
--- a/examples/sql-to-pandas.py
+++ b/examples/sql-to-pandas.py
@@ -33,17 +33,11 @@
"order by passenger_count"
)
-# collect as list of pyarrow.RecordBatch
-results = df.collect()
-
-# get first batch
-batch = results[0]
-
# convert to Pandas
-df = batch.to_pandas()
+pandas_df = df.to_pandas()
# create a chart
-fig = df.plot(
+fig = pandas_df.plot(
kind="bar", title="Trip Count by Number of Passengers"
).get_figure()
fig.savefig("chart.png")
diff --git a/examples/substrait.py b/examples/substrait.py
new file mode 100644
index 0000000..c167f7d
--- /dev/null
+++ b/examples/substrait.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import SessionContext
+from datafusion import substrait as ss
+
+
+# Create a DataFusion context
+ctx = SessionContext()
+
+# Register table with context
+ctx.register_parquet(
+ "aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv"
+)
+
+substrait_plan = ss.substrait.serde.serialize_to_plan(
+ "SELECT * FROM aggregate_test_data", ctx
+)
+# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
+
+# Alternative serialization approaches
+# type(substrait_bytes) -> <class 'list'>, at this point the bytes can be distributed to file, network, etc safely
+# where they could subsequently be deserialized on the receiving end.
+substrait_bytes = ss.substrait.serde.serialize_bytes(
+ "SELECT * FROM aggregate_test_data", ctx
+)
+
+# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused
+# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
+substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes)
+
+# type(df_logical_plan) -> <class 'substrait.LogicalPlan'>
+df_logical_plan = ss.substrait.consumer.from_substrait_plan(
+ ctx, substrait_plan
+)
+
+# Back to Substrait Plan just for demonstration purposes
+# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
+substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan)
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 4b9fbca..a1c68dd 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -313,6 +313,24 @@
Ok(())
}
+ /// Convert to pandas dataframe with pyarrow
+ /// Collect the batches, pass to Arrow Table & then convert to Pandas DataFrame
+ fn to_pandas(&self, py: Python) -> PyResult<PyObject> {
+ let batches = self.collect(py);
+
+ Python::with_gil(|py| {
+ // Instantiate pyarrow Table object and use its from_batches method
+ let table_class = py.import("pyarrow")?.getattr("Table")?;
+ let args = PyTuple::new(py, batches);
+ let table: PyObject = table_class.call_method1("from_batches", args)?.into();
+
+ // Use Table.to_pandas() method to convert batches to pandas dataframe
+ // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas
+ let result = table.call_method0(py, "to_pandas")?;
+ Ok(result)
+ })
+ }
+
// Executes this DataFrame to get the total number of rows.
fn count(&self, py: Python) -> PyResult<usize> {
Ok(wait_for_future(py, self.df.as_ref().clone().count())?)
diff --git a/src/expr.rs b/src/expr.rs
index f3695fe..90ce6bf 100644
--- a/src/expr.rs
+++ b/src/expr.rs
@@ -22,10 +22,24 @@
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField};
+use crate::errors::py_runtime_err;
+use crate::expr::aggregate_expr::PyAggregateFunction;
+use crate::expr::binary_expr::PyBinaryExpr;
+use crate::expr::column::PyColumn;
+use crate::expr::literal::PyLiteral;
use datafusion::scalar::ScalarValue;
+pub mod aggregate;
+pub mod aggregate_expr;
+pub mod analyze;
+pub mod binary_expr;
+pub mod column;
+pub mod filter;
+pub mod limit;
+pub mod literal;
pub mod logical_node;
pub mod projection;
+pub mod sort;
pub mod table_scan;
/// A PyExpr that can be used on a DataFrame
@@ -49,6 +63,22 @@
#[pymethods]
impl PyExpr {
+ /// Return the specific expression
+ fn to_variant(&self, py: Python) -> PyResult<PyObject> {
+ Python::with_gil(|_| match &self.expr {
+ Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_py(py)),
+ Expr::Literal(value) => Ok(PyLiteral::from(value.clone()).into_py(py)),
+ Expr::BinaryExpr(expr) => Ok(PyBinaryExpr::from(expr.clone()).into_py(py)),
+ Expr::AggregateFunction(expr) => {
+ Ok(PyAggregateFunction::from(expr.clone()).into_py(py))
+ }
+ other => Err(py_runtime_err(format!(
+ "Cannot convert this Expr to a Python object: {:?}",
+ other
+ ))),
+ })
+ }
+
fn __richcmp__(&self, other: PyExpr, op: CompareOp) -> PyExpr {
let expr = match op {
CompareOp::Lt => self.expr.clone().lt(other.expr),
@@ -140,8 +170,20 @@
/// Initializes the `expr` module to match the pattern of `datafusion-expr` https://docs.rs/datafusion-expr/latest/datafusion_expr/
pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
+ // expressions
m.add_class::<PyExpr>()?;
+ m.add_class::<PyColumn>()?;
+ m.add_class::<PyLiteral>()?;
+ m.add_class::<PyBinaryExpr>()?;
+ m.add_class::<PyLiteral>()?;
+ m.add_class::<PyAggregateFunction>()?;
+ // operators
m.add_class::<table_scan::PyTableScan>()?;
m.add_class::<projection::PyProjection>()?;
+ m.add_class::<filter::PyFilter>()?;
+ m.add_class::<limit::PyLimit>()?;
+ m.add_class::<aggregate::PyAggregate>()?;
+ m.add_class::<sort::PySort>()?;
+ m.add_class::<analyze::PyAnalyze>()?;
Ok(())
}
diff --git a/src/expr/aggregate.rs b/src/expr/aggregate.rs
new file mode 100644
index 0000000..98d1f55
--- /dev/null
+++ b/src/expr/aggregate.rs
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::DataFusionError;
+use datafusion_expr::logical_plan::Aggregate;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::expr::PyExpr;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Aggregate", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyAggregate {
+ aggregate: Aggregate,
+}
+
+impl From<Aggregate> for PyAggregate {
+ fn from(aggregate: Aggregate) -> PyAggregate {
+ PyAggregate { aggregate }
+ }
+}
+
+impl TryFrom<PyAggregate> for Aggregate {
+ type Error = DataFusionError;
+
+ fn try_from(agg: PyAggregate) -> Result<Self, Self::Error> {
+ Ok(agg.aggregate)
+ }
+}
+
+impl Display for PyAggregate {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(
+ f,
+ "Aggregate
+ \nGroupBy(s): {:?}
+ \nAggregates(s): {:?}
+ \nInput: {:?}
+ \nProjected Schema: {:?}",
+ &self.aggregate.group_expr,
+ &self.aggregate.aggr_expr,
+ self.aggregate.input,
+ self.aggregate.schema
+ )
+ }
+}
+
+#[pymethods]
+impl PyAggregate {
+ /// Retrieves the grouping expressions for this `Aggregate`
+ fn group_by_exprs(&self) -> PyResult<Vec<PyExpr>> {
+ Ok(self
+ .aggregate
+ .group_expr
+ .iter()
+ .map(|e| PyExpr::from(e.clone()))
+ .collect())
+ }
+
+ /// Retrieves the aggregate expressions for this `Aggregate`
+ fn aggregate_exprs(&self) -> PyResult<Vec<PyExpr>> {
+ Ok(self
+ .aggregate
+ .aggr_expr
+ .iter()
+ .map(|e| PyExpr::from(e.clone()))
+ .collect())
+ }
+
+ // Retrieves the input `LogicalPlan` to this `Aggregate` node
+ fn input(&self) -> PyLogicalPlan {
+ PyLogicalPlan::from((*self.aggregate.input).clone())
+ }
+
+ // Resulting Schema for this `Aggregate` node instance
+ fn schema(&self) -> PyDFSchema {
+ (*self.aggregate.schema).clone().into()
+ }
+
+ fn __repr__(&self) -> PyResult<String> {
+ Ok(format!("Aggregate({})", self))
+ }
+}
+
+impl LogicalNode for PyAggregate {
+ fn input(&self) -> Vec<PyLogicalPlan> {
+ vec![PyLogicalPlan::from((*self.aggregate.input).clone())]
+ }
+}
diff --git a/src/expr/aggregate_expr.rs b/src/expr/aggregate_expr.rs
new file mode 100644
index 0000000..1801051
--- /dev/null
+++ b/src/expr/aggregate_expr.rs
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::expr::PyExpr;
+use datafusion_expr::expr::AggregateFunction;
+use pyo3::prelude::*;
+use std::fmt::{Display, Formatter};
+
+#[pyclass(name = "AggregateFunction", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyAggregateFunction {
+ aggr: AggregateFunction,
+}
+
+impl From<PyAggregateFunction> for AggregateFunction {
+ fn from(aggr: PyAggregateFunction) -> Self {
+ aggr.aggr
+ }
+}
+
+impl From<AggregateFunction> for PyAggregateFunction {
+ fn from(aggr: AggregateFunction) -> PyAggregateFunction {
+ PyAggregateFunction { aggr }
+ }
+}
+
+impl Display for PyAggregateFunction {
+ fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+ let args: Vec<String> = self.aggr.args.iter().map(|expr| expr.to_string()).collect();
+ write!(f, "{}({})", self.aggr.fun, args.join(", "))
+ }
+}
+
+#[pymethods]
+impl PyAggregateFunction {
+ /// Get the aggregate type, such as "MIN", or "MAX"
+ fn aggregate_type(&self) -> String {
+ format!("{}", self.aggr.fun)
+ }
+
+ /// is this a distinct aggregate such as `COUNT(DISTINCT expr)`
+ fn is_distinct(&self) -> bool {
+ self.aggr.distinct
+ }
+
+ /// Get the arguments to the aggregate function
+ fn args(&self) -> Vec<PyExpr> {
+ self.aggr
+ .args
+ .iter()
+ .map(|expr| PyExpr::from(expr.clone()))
+ .collect()
+ }
+
+ /// Get a String representation of this column
+ fn __repr__(&self) -> String {
+ format!("{}", self)
+ }
+}
diff --git a/src/expr/analyze.rs b/src/expr/analyze.rs
new file mode 100644
index 0000000..095fab0
--- /dev/null
+++ b/src/expr/analyze.rs
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::Analyze;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Analyze", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyAnalyze {
+ analyze: Analyze,
+}
+
+impl PyAnalyze {
+ pub fn new(analyze: Analyze) -> Self {
+ Self { analyze }
+ }
+}
+
+impl From<Analyze> for PyAnalyze {
+ fn from(analyze: Analyze) -> PyAnalyze {
+ PyAnalyze { analyze }
+ }
+}
+
+impl From<PyAnalyze> for Analyze {
+ fn from(analyze: PyAnalyze) -> Self {
+ analyze.analyze
+ }
+}
+
+impl Display for PyAnalyze {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(f, "Analyze Table")
+ }
+}
+
+#[pymethods]
+impl PyAnalyze {
+ fn verbose(&self) -> PyResult<bool> {
+ Ok(self.analyze.verbose)
+ }
+
+ /// Resulting Schema for this `Analyze` node instance
+ fn schema(&self) -> PyResult<PyDFSchema> {
+ Ok((*self.analyze.schema).clone().into())
+ }
+
+ fn __repr__(&self) -> PyResult<String> {
+ Ok(format!("Analyze({})", self))
+ }
+}
+
+impl LogicalNode for PyAnalyze {
+ fn input(&self) -> Vec<PyLogicalPlan> {
+ vec![PyLogicalPlan::from((*self.analyze.input).clone())]
+ }
+}
diff --git a/src/expr/binary_expr.rs b/src/expr/binary_expr.rs
new file mode 100644
index 0000000..5f382b7
--- /dev/null
+++ b/src/expr/binary_expr.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::expr::PyExpr;
+use datafusion_expr::BinaryExpr;
+use pyo3::prelude::*;
+
+#[pyclass(name = "BinaryExpr", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyBinaryExpr {
+ expr: BinaryExpr,
+}
+
+impl From<PyBinaryExpr> for BinaryExpr {
+ fn from(expr: PyBinaryExpr) -> Self {
+ expr.expr
+ }
+}
+
+impl From<BinaryExpr> for PyBinaryExpr {
+ fn from(expr: BinaryExpr) -> PyBinaryExpr {
+ PyBinaryExpr { expr }
+ }
+}
+
+#[pymethods]
+impl PyBinaryExpr {
+ fn left(&self) -> PyExpr {
+ self.expr.left.as_ref().clone().into()
+ }
+
+ fn right(&self) -> PyExpr {
+ self.expr.right.as_ref().clone().into()
+ }
+
+ fn op(&self) -> String {
+ format!("{}", self.expr.op)
+ }
+
+ fn __repr__(&self) -> PyResult<String> {
+ Ok(format!("{}", self.expr))
+ }
+}
diff --git a/src/expr/column.rs b/src/expr/column.rs
new file mode 100644
index 0000000..16b8bce
--- /dev/null
+++ b/src/expr/column.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::Column;
+use pyo3::prelude::*;
+
+#[pyclass(name = "Column", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyColumn {
+ pub col: Column,
+}
+
+impl PyColumn {
+ pub fn new(col: Column) -> Self {
+ Self { col }
+ }
+}
+
+impl From<Column> for PyColumn {
+ fn from(col: Column) -> PyColumn {
+ PyColumn { col }
+ }
+}
+
+#[pymethods]
+impl PyColumn {
+ /// Get the column name
+ fn name(&self) -> String {
+ self.col.name.clone()
+ }
+
+ /// Get the column relation
+ fn relation(&self) -> Option<String> {
+ self.col.relation.clone()
+ }
+
+ /// Get the fully-qualified column name
+ fn qualified_name(&self) -> String {
+ self.col.flat_name()
+ }
+
+ /// Get a String representation of this column
+ fn __repr__(&self) -> String {
+ self.qualified_name()
+ }
+}
diff --git a/src/expr/filter.rs b/src/expr/filter.rs
new file mode 100644
index 0000000..b7b48b9
--- /dev/null
+++ b/src/expr/filter.rs
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::Filter;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::expr::PyExpr;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Filter", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyFilter {
+ filter: Filter,
+}
+
+impl From<Filter> for PyFilter {
+ fn from(filter: Filter) -> PyFilter {
+ PyFilter { filter }
+ }
+}
+
+impl From<PyFilter> for Filter {
+ fn from(filter: PyFilter) -> Self {
+ filter.filter
+ }
+}
+
+impl Display for PyFilter {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(
+ f,
+ "Filter
+ \nPredicate: {:?}
+ \nInput: {:?}",
+ &self.filter.predicate, &self.filter.input
+ )
+ }
+}
+
+#[pymethods]
+impl PyFilter {
+ /// Retrieves the predicate expression for this `Filter`
+ fn predicate(&self) -> PyExpr {
+ PyExpr::from(self.filter.predicate.clone())
+ }
+
+ /// Retrieves the input `LogicalPlan` to this `Filter` node
+ fn input(&self) -> PyLogicalPlan {
+ PyLogicalPlan::from((*self.filter.input).clone())
+ }
+
+ /// Resulting Schema for this `Filter` node instance
+ fn schema(&self) -> PyDFSchema {
+ self.filter.input.schema().as_ref().clone().into()
+ }
+
+ fn __repr__(&self) -> String {
+ format!("Filter({})", self)
+ }
+}
+
+impl LogicalNode for PyFilter {
+ fn input(&self) -> Vec<PyLogicalPlan> {
+ vec![PyLogicalPlan::from((*self.filter.input).clone())]
+ }
+}
diff --git a/src/expr/limit.rs b/src/expr/limit.rs
new file mode 100644
index 0000000..a50e5b8
--- /dev/null
+++ b/src/expr/limit.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::Limit;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Limit", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyLimit {
+ limit: Limit,
+}
+
+impl From<Limit> for PyLimit {
+ fn from(limit: Limit) -> PyLimit {
+ PyLimit { limit }
+ }
+}
+
+impl From<PyLimit> for Limit {
+ fn from(limit: PyLimit) -> Self {
+ limit.limit
+ }
+}
+
+impl Display for PyLimit {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(
+ f,
+ "Limit
+ \nSkip: {}
+ \nFetch: {:?}
+ \nInput: {:?}",
+ &self.limit.skip, &self.limit.fetch, &self.limit.input
+ )
+ }
+}
+
+#[pymethods]
+impl PyLimit {
+ /// Retrieves the skip value for this `Limit`
+ fn skip(&self) -> usize {
+ self.limit.skip
+ }
+
+ /// Retrieves the fetch value for this `Limit`
+ fn fetch(&self) -> Option<usize> {
+ self.limit.fetch
+ }
+
+ /// Retrieves the input `LogicalPlan` to this `Limit` node
+ fn input(&self) -> PyLogicalPlan {
+ PyLogicalPlan::from((*self.limit.input).clone())
+ }
+
+ /// Resulting Schema for this `Limit` node instance
+ fn schema(&self) -> PyResult<PyDFSchema> {
+ Ok(self.limit.input.schema().as_ref().clone().into())
+ }
+
+ fn __repr__(&self) -> PyResult<String> {
+ Ok(format!("Limit({})", self))
+ }
+}
+
+impl LogicalNode for PyLimit {
+ fn input(&self) -> Vec<PyLogicalPlan> {
+ vec![PyLogicalPlan::from((*self.limit.input).clone())]
+ }
+}
diff --git a/src/expr/literal.rs b/src/expr/literal.rs
new file mode 100644
index 0000000..27674ce
--- /dev/null
+++ b/src/expr/literal.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::py_runtime_err;
+use datafusion_common::ScalarValue;
+use pyo3::prelude::*;
+
+#[pyclass(name = "Literal", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyLiteral {
+ pub value: ScalarValue,
+}
+
+impl From<PyLiteral> for ScalarValue {
+ fn from(lit: PyLiteral) -> ScalarValue {
+ lit.value
+ }
+}
+
+impl From<ScalarValue> for PyLiteral {
+ fn from(value: ScalarValue) -> PyLiteral {
+ PyLiteral { value }
+ }
+}
+
+#[pymethods]
+impl PyLiteral {
+ /// Get the data type of this literal value
+ fn data_type(&self) -> String {
+ format!("{}", self.value.get_datatype())
+ }
+
+ fn value_i32(&self) -> PyResult<i32> {
+ if let ScalarValue::Int32(Some(n)) = &self.value {
+ Ok(*n)
+ } else {
+ Err(py_runtime_err("Cannot access value as i32"))
+ }
+ }
+
+ fn value_i64(&self) -> PyResult<i64> {
+ if let ScalarValue::Int64(Some(n)) = &self.value {
+ Ok(*n)
+ } else {
+ Err(py_runtime_err("Cannot access value as i64"))
+ }
+ }
+
+ fn value_str(&self) -> PyResult<String> {
+ if let ScalarValue::Utf8(Some(str)) = &self.value {
+ Ok(str.clone())
+ } else {
+ Err(py_runtime_err("Cannot access value as string"))
+ }
+ }
+
+ fn __repr__(&self) -> PyResult<String> {
+ Ok(format!("{}", self.value))
+ }
+}
diff --git a/src/expr/projection.rs b/src/expr/projection.rs
index 6d04e59..4c158f7 100644
--- a/src/expr/projection.rs
+++ b/src/expr/projection.rs
@@ -15,13 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use datafusion_common::DataFusionError;
use datafusion_expr::logical_plan::Projection;
use pyo3::prelude::*;
use std::fmt::{self, Display, Formatter};
use crate::common::df_schema::PyDFSchema;
-use crate::errors::py_runtime_err;
use crate::expr::logical_node::LogicalNode;
use crate::expr::PyExpr;
use crate::sql::logical::PyLogicalPlan;
@@ -32,21 +30,21 @@
projection: Projection,
}
+impl PyProjection {
+ pub fn new(projection: Projection) -> Self {
+ Self { projection }
+ }
+}
+
impl From<Projection> for PyProjection {
fn from(projection: Projection) -> PyProjection {
PyProjection { projection }
}
}
-impl TryFrom<PyProjection> for Projection {
- type Error = DataFusionError;
-
- fn try_from(py_proj: PyProjection) -> Result<Self, Self::Error> {
- Projection::try_new_with_schema(
- py_proj.projection.expr,
- py_proj.projection.input.clone(),
- py_proj.projection.schema,
- )
+impl From<PyProjection> for Projection {
+ fn from(proj: PyProjection) -> Self {
+ proj.projection
}
}
@@ -66,8 +64,7 @@
#[pymethods]
impl PyProjection {
/// Retrieves the expressions for this `Projection`
- #[pyo3(name = "projections")]
- fn py_projections(&self) -> PyResult<Vec<PyExpr>> {
+ fn projections(&self) -> PyResult<Vec<PyExpr>> {
Ok(self
.projection
.expr
@@ -76,25 +73,13 @@
.collect())
}
- // Retrieves the input `LogicalPlan` to this `Projection` node
- #[pyo3(name = "input")]
- fn py_input(&self) -> PyResult<PyLogicalPlan> {
- // DataFusion make a loose guarantee that each Projection should have an input, however
- // we check for that hear since we are performing explicit index retrieval
- let inputs = LogicalNode::input(self);
- if !inputs.is_empty() {
- return Ok(inputs[0].clone());
- }
-
- Err(py_runtime_err(format!(
- "Expected `input` field for Projection node: {}",
- self
- )))
+ /// Retrieves the input `LogicalPlan` to this `Projection` node
+ fn input(&self) -> PyLogicalPlan {
+ PyLogicalPlan::from((*self.projection.input).clone())
}
- // Resulting Schema for this `Projection` node instance
- #[pyo3(name = "schema")]
- fn py_schema(&self) -> PyResult<PyDFSchema> {
+ /// Resulting Schema for this `Projection` node instance
+ fn schema(&self) -> PyResult<PyDFSchema> {
Ok((*self.projection.schema).clone().into())
}
diff --git a/src/expr/sort.rs b/src/expr/sort.rs
new file mode 100644
index 0000000..1d0a7f6
--- /dev/null
+++ b/src/expr/sort.rs
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::DataFusionError;
+use datafusion_expr::logical_plan::Sort;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::expr::PyExpr;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Sort", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PySort {
+ sort: Sort,
+}
+
+impl From<Sort> for PySort {
+ fn from(sort: Sort) -> PySort {
+ PySort { sort }
+ }
+}
+
+impl TryFrom<PySort> for Sort {
+ type Error = DataFusionError;
+
+ fn try_from(agg: PySort) -> Result<Self, Self::Error> {
+ Ok(agg.sort)
+ }
+}
+
+impl Display for PySort {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ write!(
+ f,
+ "Sort
+ \nExpr(s): {:?}
+ \nInput: {:?}
+ \nSchema: {:?}",
+ &self.sort.expr,
+ self.sort.input,
+ self.sort.input.schema()
+ )
+ }
+}
+
+#[pymethods]
+impl PySort {
+ /// Retrieves the sort expressions for this `Sort`
+ fn sort_exprs(&self) -> PyResult<Vec<PyExpr>> {
+ Ok(self
+ .sort
+ .expr
+ .iter()
+ .map(|e| PyExpr::from(e.clone()))
+ .collect())
+ }
+
+ /// Retrieves the input `LogicalPlan` to this `Sort` node
+ fn input(&self) -> PyLogicalPlan {
+ PyLogicalPlan::from((*self.sort.input).clone())
+ }
+
+ /// Resulting Schema for this `Sort` node instance
+ fn schema(&self) -> PyDFSchema {
+ self.sort.input.schema().as_ref().clone().into()
+ }
+
+ fn __repr__(&self) -> PyResult<String> {
+ Ok(format!("Sort({})", self))
+ }
+}
+
+impl LogicalNode for PySort {
+ fn input(&self) -> Vec<PyLogicalPlan> {
+ vec![PyLogicalPlan::from((*self.sort.input).clone())]
+ }
+}
diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs
index 00504b9..2784523 100644
--- a/src/expr/table_scan.rs
+++ b/src/expr/table_scan.rs
@@ -19,6 +19,8 @@
use pyo3::prelude::*;
use std::fmt::{self, Display, Formatter};
+use crate::expr::logical_node::LogicalNode;
+use crate::sql::logical::PyLogicalPlan;
use crate::{common::df_schema::PyDFSchema, expr::PyExpr};
#[pyclass(name = "TableScan", module = "datafusion.expr", subclass)]
@@ -27,6 +29,12 @@
table_scan: TableScan,
}
+impl PyTableScan {
+ pub fn new(table_scan: TableScan) -> Self {
+ Self { table_scan }
+ }
+}
+
impl From<PyTableScan> for TableScan {
fn from(tbl_scan: PyTableScan) -> TableScan {
tbl_scan.table_scan
@@ -117,3 +125,10 @@
Ok(format!("TableScan({})", self))
}
}
+
+impl LogicalNode for PyTableScan {
+ fn input(&self) -> Vec<PyLogicalPlan> {
+ // table scans are leaf nodes and do not have inputs
+ vec![]
+ }
+}
diff --git a/src/sql/logical.rs b/src/sql/logical.rs
index dcd7baa..ee48f1e 100644
--- a/src/sql/logical.rs
+++ b/src/sql/logical.rs
@@ -17,6 +17,14 @@
use std::sync::Arc;
+use crate::errors::py_runtime_err;
+use crate::expr::aggregate::PyAggregate;
+use crate::expr::analyze::PyAnalyze;
+use crate::expr::filter::PyFilter;
+use crate::expr::limit::PyLimit;
+use crate::expr::projection::PyProjection;
+use crate::expr::sort::PySort;
+use crate::expr::table_scan::PyTableScan;
use datafusion_expr::LogicalPlan;
use pyo3::prelude::*;
@@ -33,12 +41,33 @@
plan: Arc::new(plan),
}
}
+
+ pub fn plan(&self) -> Arc<LogicalPlan> {
+ self.plan.clone()
+ }
}
#[pymethods]
impl PyLogicalPlan {
+ /// Return the specific logical operator
+ fn to_variant(&self, py: Python) -> PyResult<PyObject> {
+ Python::with_gil(|_| match self.plan.as_ref() {
+ LogicalPlan::Aggregate(plan) => Ok(PyAggregate::from(plan.clone()).into_py(py)),
+ LogicalPlan::Analyze(plan) => Ok(PyAnalyze::from(plan.clone()).into_py(py)),
+ LogicalPlan::Filter(plan) => Ok(PyFilter::from(plan.clone()).into_py(py)),
+ LogicalPlan::Limit(plan) => Ok(PyLimit::from(plan.clone()).into_py(py)),
+ LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)),
+ LogicalPlan::Sort(plan) => Ok(PySort::from(plan.clone()).into_py(py)),
+ LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)),
+ other => Err(py_runtime_err(format!(
+ "Cannot convert this plan to a LogicalNode: {:?}",
+ other
+ ))),
+ })
+ }
+
/// Get the inputs to this plan
- pub fn inputs(&self) -> Vec<PyLogicalPlan> {
+ fn inputs(&self) -> Vec<PyLogicalPlan> {
let mut inputs = vec![];
for input in self.plan.inputs() {
inputs.push(input.to_owned().into());
@@ -46,19 +75,23 @@
inputs
}
- pub fn display(&self) -> String {
+ fn __repr__(&self) -> PyResult<String> {
+ Ok(format!("{:?}", self.plan))
+ }
+
+ fn display(&self) -> String {
format!("{}", self.plan.display())
}
- pub fn display_indent(&self) -> String {
+ fn display_indent(&self) -> String {
format!("{}", self.plan.display_indent())
}
- pub fn display_indent_schema(&self) -> String {
+ fn display_indent_schema(&self) -> String {
format!("{}", self.plan.display_indent_schema())
}
- pub fn display_graphviz(&self) -> String {
+ fn display_graphviz(&self) -> String {
format!("{}", self.plan.display_indent_schema())
}
}