[branch-0.8] Merge from main (#201)

* changelog (#188)

* Add Python wrapper for LogicalPlan::Sort (#196)

* Add Python wrapper for LogicalPlan::Aggregate (#195)

* Add Python wrapper for LogicalPlan::Limit (#193)

* Add Python wrapper for LogicalPlan::Filter (#192)

* Add Python wrapper for LogicalPlan::Filter

* clippy

* clippy

* Update src/expr/filter.rs

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>

---------

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>

* Add tests for recently added functionality (#199)

* Add experimental support for executing SQL with Polars and Pandas (#190)

* Run `maturin develop` instead of `cargo build` in verification script (#200)

* Implement `to_pandas()` (#197)

* Implement to_pandas()

* Update documentation

* Write unit test

* Add support for cudf as a physical execution engine (#205)

* Update README in preparation for 0.8 release (#206)

* Analyze table bindings (#204)

* method for getting the internal LogicalPlan instance

* Add explain plan method

* Add bindings for analyze table

* Add to_variant

* cargo fmt

* blake and flake formatting

* changelog (#209)

---------

Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Dejan Simic <10134699+simicd@users.noreply.github.com>
Co-authored-by: Jeremy Dyer <jdye64@gmail.com>
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8d47fdf..f196916 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -19,50 +19,36 @@
 
 # Changelog
 
-## [0.8.0](https://github.com/apache/arrow-datafusion-python/tree/0.8.0) (2023-02-17)
+## [0.8.0](https://github.com/apache/arrow-datafusion-python/tree/0.8.0) (2023-02-22)
 
-[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0...0.8.0)
+[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.8.0-rc1...0.8.0)
 
 **Implemented enhancements:**
 
-- Add bindings for datafusion\_common::DFField [\#184](https://github.com/apache/arrow-datafusion-python/issues/184)
-- Add bindings for DFSchema/DFSchemaRef [\#181](https://github.com/apache/arrow-datafusion-python/issues/181)
-- Add bindings for datafusion\_expr Projection [\#179](https://github.com/apache/arrow-datafusion-python/issues/179)
-- Add bindings for `TableScan` struct from `datafusion_expr::TableScan` [\#177](https://github.com/apache/arrow-datafusion-python/issues/177)
-- Add a "mapping" struct for types [\#172](https://github.com/apache/arrow-datafusion-python/issues/172)
-- Improve string representation of datafusion classes \(dataframe, context, expression, ...\) [\#158](https://github.com/apache/arrow-datafusion-python/issues/158)
-- Add DataFrame count method [\#151](https://github.com/apache/arrow-datafusion-python/issues/151)
-- \[REQUEST\] Github Actions Improvements [\#146](https://github.com/apache/arrow-datafusion-python/issues/146)
-- Change default branch name from master to main [\#144](https://github.com/apache/arrow-datafusion-python/issues/144)
-- Bump pyo3 to 0.18.0 [\#140](https://github.com/apache/arrow-datafusion-python/issues/140)
-- Add script for Python linting [\#134](https://github.com/apache/arrow-datafusion-python/issues/134)
-- Add Python bindings for substrait module [\#132](https://github.com/apache/arrow-datafusion-python/issues/132)
-- Expand unit tests for built-in functions [\#128](https://github.com/apache/arrow-datafusion-python/issues/128)
-- support creating arrow-datafusion-python conda environment [\#122](https://github.com/apache/arrow-datafusion-python/issues/122)
-- Build Python source distribution in GitHub workflow [\#81](https://github.com/apache/arrow-datafusion-python/issues/81)
-- EPIC: Add all functions to python binding `functions` [\#72](https://github.com/apache/arrow-datafusion-python/issues/72)
+- Add support for cuDF physical execution engine [\#202](https://github.com/apache/arrow-datafusion-python/issues/202)
+- Make it easier to create a Pandas dataframe from DataFusion query results [\#139](https://github.com/apache/arrow-datafusion-python/issues/139)
 
 **Fixed bugs:**
 
-- Build is broken [\#161](https://github.com/apache/arrow-datafusion-python/issues/161)
-- Out of memory when sorting [\#157](https://github.com/apache/arrow-datafusion-python/issues/157)
-- window\_lead test appears to be non-deterministic [\#135](https://github.com/apache/arrow-datafusion-python/issues/135)
-- Reading csv does not work [\#130](https://github.com/apache/arrow-datafusion-python/issues/130)
-- Github actions produce a lot of warnings [\#94](https://github.com/apache/arrow-datafusion-python/issues/94)
-- ASF source release tarball has wrong directory name [\#90](https://github.com/apache/arrow-datafusion-python/issues/90)
-- Python Release Build failing after upgrading to maturin 14.2 [\#87](https://github.com/apache/arrow-datafusion-python/issues/87)
-- Maturin build hangs on Linux ARM64 [\#84](https://github.com/apache/arrow-datafusion-python/issues/84)
-- Cannot install on Mac M1 from source tarball from testpypi [\#82](https://github.com/apache/arrow-datafusion-python/issues/82)
-- ImportPathMismatchError when running pytest locally [\#77](https://github.com/apache/arrow-datafusion-python/issues/77)
+- Build error: could not compile `thiserror` due to 2 previous errors [\#69](https://github.com/apache/arrow-datafusion-python/issues/69)
 
 **Closed issues:**
 
-- Publish documentation for Python bindings [\#39](https://github.com/apache/arrow-datafusion-python/issues/39)
-- Add Python binding for `approx_median` [\#32](https://github.com/apache/arrow-datafusion-python/issues/32)
-- Release version 0.7.0 [\#7](https://github.com/apache/arrow-datafusion-python/issues/7)
+- Integrate with the new `object_store` crate [\#22](https://github.com/apache/arrow-datafusion-python/issues/22)
 
 **Merged pull requests:**
 
+- Update README in preparation for 0.8 release [\#206](https://github.com/apache/arrow-datafusion-python/pull/206) ([andygrove](https://github.com/andygrove))
+- Add support for cudf as a physical execution engine [\#205](https://github.com/apache/arrow-datafusion-python/pull/205) ([jdye64](https://github.com/jdye64))
+- Run `maturin develop` instead of `cargo build` in verification script [\#200](https://github.com/apache/arrow-datafusion-python/pull/200) ([andygrove](https://github.com/andygrove))
+- Add tests for recently added functionality [\#199](https://github.com/apache/arrow-datafusion-python/pull/199) ([andygrove](https://github.com/andygrove))
+- Implement `to_pandas()` [\#197](https://github.com/apache/arrow-datafusion-python/pull/197) ([simicd](https://github.com/simicd))
+- Add Python wrapper for LogicalPlan::Sort [\#196](https://github.com/apache/arrow-datafusion-python/pull/196) ([andygrove](https://github.com/andygrove))
+- Add Python wrapper for LogicalPlan::Aggregate [\#195](https://github.com/apache/arrow-datafusion-python/pull/195) ([andygrove](https://github.com/andygrove))
+- Add Python wrapper for LogicalPlan::Limit [\#193](https://github.com/apache/arrow-datafusion-python/pull/193) ([andygrove](https://github.com/andygrove))
+- Add Python wrapper for LogicalPlan::Filter [\#192](https://github.com/apache/arrow-datafusion-python/pull/192) ([andygrove](https://github.com/andygrove))
+- Add experimental support for executing SQL with Polars and Pandas [\#190](https://github.com/apache/arrow-datafusion-python/pull/190) ([andygrove](https://github.com/andygrove))
+- Update changelog for 0.8 release [\#188](https://github.com/apache/arrow-datafusion-python/pull/188) ([andygrove](https://github.com/andygrove))
 - Add ability to execute ExecutionPlan and get a stream of RecordBatch [\#186](https://github.com/apache/arrow-datafusion-python/pull/186) ([andygrove](https://github.com/andygrove))
 - Dffield bindings [\#185](https://github.com/apache/arrow-datafusion-python/pull/185) ([jdye64](https://github.com/jdye64))
 - Add bindings for DFSchema [\#183](https://github.com/apache/arrow-datafusion-python/pull/183) ([jdye64](https://github.com/jdye64))
@@ -118,6 +104,52 @@
 - Update release instructions [\#83](https://github.com/apache/arrow-datafusion-python/pull/83) ([andygrove](https://github.com/andygrove))
 - \[Functions\] - Add python function binding to `functions` [\#73](https://github.com/apache/arrow-datafusion-python/pull/73) ([francis-du](https://github.com/francis-du))
 
+## [0.8.0-rc1](https://github.com/apache/arrow-datafusion-python/tree/0.8.0-rc1) (2023-02-17)
+
+[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0-rc2...0.8.0-rc1)
+
+**Implemented enhancements:**
+
+- Add bindings for datafusion\_common::DFField [\#184](https://github.com/apache/arrow-datafusion-python/issues/184)
+- Add bindings for DFSchema/DFSchemaRef [\#181](https://github.com/apache/arrow-datafusion-python/issues/181)
+- Add bindings for datafusion\_expr Projection [\#179](https://github.com/apache/arrow-datafusion-python/issues/179)
+- Add bindings for `TableScan` struct from `datafusion_expr::TableScan` [\#177](https://github.com/apache/arrow-datafusion-python/issues/177)
+- Add a "mapping" struct for types [\#172](https://github.com/apache/arrow-datafusion-python/issues/172)
+- Improve string representation of datafusion classes \(dataframe, context, expression, ...\) [\#158](https://github.com/apache/arrow-datafusion-python/issues/158)
+- Add DataFrame count method [\#151](https://github.com/apache/arrow-datafusion-python/issues/151)
+- \[REQUEST\] Github Actions Improvements [\#146](https://github.com/apache/arrow-datafusion-python/issues/146)
+- Change default branch name from master to main [\#144](https://github.com/apache/arrow-datafusion-python/issues/144)
+- Bump pyo3 to 0.18.0 [\#140](https://github.com/apache/arrow-datafusion-python/issues/140)
+- Add script for Python linting [\#134](https://github.com/apache/arrow-datafusion-python/issues/134)
+- Add Python bindings for substrait module [\#132](https://github.com/apache/arrow-datafusion-python/issues/132)
+- Expand unit tests for built-in functions [\#128](https://github.com/apache/arrow-datafusion-python/issues/128)
+- support creating arrow-datafusion-python conda environment [\#122](https://github.com/apache/arrow-datafusion-python/issues/122)
+- Build Python source distribution in GitHub workflow [\#81](https://github.com/apache/arrow-datafusion-python/issues/81)
+- EPIC: Add all functions to python binding `functions` [\#72](https://github.com/apache/arrow-datafusion-python/issues/72)
+
+**Fixed bugs:**
+
+- Build is broken [\#161](https://github.com/apache/arrow-datafusion-python/issues/161)
+- Out of memory when sorting [\#157](https://github.com/apache/arrow-datafusion-python/issues/157)
+- window\_lead test appears to be non-deterministic [\#135](https://github.com/apache/arrow-datafusion-python/issues/135)
+- Reading csv does not work [\#130](https://github.com/apache/arrow-datafusion-python/issues/130)
+- Github actions produce a lot of warnings [\#94](https://github.com/apache/arrow-datafusion-python/issues/94)
+- ASF source release tarball has wrong directory name [\#90](https://github.com/apache/arrow-datafusion-python/issues/90)
+- Python Release Build failing after upgrading to maturin 14.2 [\#87](https://github.com/apache/arrow-datafusion-python/issues/87)
+- Maturin build hangs on Linux ARM64 [\#84](https://github.com/apache/arrow-datafusion-python/issues/84)
+- Cannot install on Mac M1 from source tarball from testpypi [\#82](https://github.com/apache/arrow-datafusion-python/issues/82)
+- ImportPathMismatchError when running pytest locally [\#77](https://github.com/apache/arrow-datafusion-python/issues/77)
+
+**Closed issues:**
+
+- Publish documentation for Python bindings [\#39](https://github.com/apache/arrow-datafusion-python/issues/39)
+- Add Python binding for `approx_median` [\#32](https://github.com/apache/arrow-datafusion-python/issues/32)
+- Release version 0.7.0 [\#7](https://github.com/apache/arrow-datafusion-python/issues/7)
+
+## [0.7.0-rc2](https://github.com/apache/arrow-datafusion-python/tree/0.7.0-rc2) (2022-11-26)
+
+[Full Changelog](https://github.com/apache/arrow-datafusion-python/compare/0.7.0...0.7.0-rc2)
+
 
 ## [Unreleased](https://github.com/datafusion-contrib/datafusion-python/tree/HEAD)
 
diff --git a/Cargo.lock b/Cargo.lock
index 5059afa..04a2ea8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1134,9 +1134,9 @@
 
 [[package]]
 name = "http"
-version = "0.2.8"
+version = "0.2.9"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
+checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482"
 dependencies = [
  "bytes",
  "fnv",
@@ -1385,9 +1385,9 @@
 
 [[package]]
 name = "libflate"
-version = "1.2.0"
+version = "1.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "05605ab2bce11bcfc0e9c635ff29ef8b2ea83f29be257ee7d730cac3ee373093"
+checksum = "97822bf791bd4d5b403713886a5fbe8bf49520fe78e323b0dc480ca1a03e50b0"
 dependencies = [
  "adler32",
  "crc32fast",
@@ -1396,9 +1396,9 @@
 
 [[package]]
 name = "libflate_lz77"
-version = "1.1.0"
+version = "1.2.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "39a734c0493409afcd49deee13c006a04e3586b9761a03543c6272c9c51f2f5a"
+checksum = "a52d3a8bfc85f250440e4424db7d857e241a3aebbbe301f3eb606ab15c39acbf"
 dependencies = [
  "rle-decode-fast",
 ]
@@ -2359,9 +2359,9 @@
 
 [[package]]
 name = "slab"
-version = "0.4.7"
+version = "0.4.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4614a76b2a8be0058caa9dbbaf66d988527d86d003c11a94fbd335d7661edcef"
+checksum = "6528351c9bc8ab22353f9d776db39a20288e8d6c37ef8cfe3317cf875eecfc2d"
 dependencies = [
  "autocfg",
 ]
@@ -2635,9 +2635,9 @@
 
 [[package]]
 name = "tokio-stream"
-version = "0.1.11"
+version = "0.1.12"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
+checksum = "8fb52b74f05dbf495a8fba459fdc331812b96aa086d9eb78101fa0d4569c3313"
 dependencies = [
  "futures-core",
  "pin-project-lite",
diff --git a/README.md b/README.md
index ab89ff6..e78f613 100644
--- a/README.md
+++ b/README.md
@@ -24,19 +24,30 @@
 
 This is a Python library that binds to [Apache Arrow](https://arrow.apache.org/) in-memory query engine [DataFusion](https://github.com/apache/arrow-datafusion).
 
-Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV
-files, run it in a multi-threaded environment, and obtain the result back in Python.
+DataFusion's Python bindings can be used as an end-user tool as well as providing a foundation for building new systems.
 
-It also allows you to use UDFs and UDAFs for complex operations.
+## Features
 
-The major advantage of this library over other execution engines is that this library achieves zero-copy between
-Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart
-from having to lock the GIL when running those operations.
+- Execute queries using SQL or DataFrames against CSV, Parquet, and JSON data sources
+- Queries are optimized using DataFusion's query optimizer
+- Execute user-defined Python code from SQL
+- Exchange data with Pandas and other DataFrame libraries that support PyArrow
+- Serialize and deserialize query plans in Substrait format
+- Experimental support for executing SQL queries against Polars, Pandas and cuDF
 
-Its query engine, DataFusion, is written in [Rust](https://www.rust-lang.org/), which makes strong assumptions
-about thread safety and lack of memory leaks.
+## Comparison with other projects
 
-Technically, zero-copy is achieved via the [c data interface](https://arrow.apache.org/docs/format/CDataInterface.html).
+Here is a comparison with similar projects that may help understand when DataFusion might be suitable and unsuitable 
+for your needs:
+
+- [DuckDB](http://www.duckdb.org/) is an open source, in-process analytic database. Like DataFusion, it supports 
+ very fast execution, both from its custom file format and directly from Parquet files. Unlike DataFusion, it is 
+ written in C/C++ and it is primarily used directly by users as a serverless database and query system rather than 
+ as a library for building such database systems.
+
+- [Polars](http://pola.rs/) is one of the fastest DataFrame libraries at the time of writing. Like DataFusion, it 
+ is also written in Rust and uses the Apache Arrow memory model, but unlike DataFusion it does not provide full SQL 
+ support, nor as many extension points.
 
 ## Example Usage
 
@@ -47,12 +58,8 @@
 
 - https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
 
-See the [examples](examples) directory for more examples.
-
 ```python
 from datafusion import SessionContext
-import pandas as pd
-import pyarrow as pa
 
 # Create a DataFusion context
 ctx = SessionContext()
@@ -67,17 +74,11 @@
              "group by passenger_count "
              "order by passenger_count")
 
-# collect as list of pyarrow.RecordBatch
-results = df.collect()
-
-# get first batch
-batch = results[0]
-
 # convert to Pandas
-df = batch.to_pandas()
+pandas_df = df.to_pandas()
 
 # create a chart
-fig = df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure()
+fig = pandas_df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure()
 fig.savefig('chart.png')
 ```
 
@@ -85,42 +86,30 @@
 
 ![Chart](examples/chart.png)
 
-## Substrait Support
+## More Examples
 
-`arrow-datafusion-python` has bindings which allow for serializing a SQL query to substrait protobuf format and deserializing substrait protobuf bytes to a DataFusion `LogicalPlan`, `PyLogicalPlan` in a Python context, which can then be executed.
+See [examples](examples/README.md) for more information.
 
-### Example of Serializing/Deserializing Substrait Plans
+### Executing Queries with DataFusion
 
-```python
-from datafusion import SessionContext
-from datafusion import substrait as ss
+- [Query a Parquet file using SQL](./examples/sql-parquet.py)
+- [Query a Parquet file using the DataFrame API](./examples/dataframe-parquet.py)
+- [Run a SQL query and store the results in a Pandas DataFrame](./examples/sql-to-pandas.py)
+- [Query PyArrow Data](./examples/query-pyarrow-data.py)
 
-# Create a DataFusion context
-ctx = SessionContext()
+### Running User-Defined Python Code
 
-# Register table with context
-ctx.register_parquet('aggregate_test_data', './testing/data/csv/aggregate_test_100.csv')
+- [Register a Python UDF with DataFusion](./examples/python-udf.py)
+- [Register a Python UDAF with DataFusion](./examples/python-udaf.py)
 
-substrait_plan = ss.substrait.serde.serialize_to_plan("SELECT * FROM aggregate_test_data", ctx)
-# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
+### Substrait Support
 
-# Alternative serialization approaches
-# type(substrait_bytes) -> <class 'list'>, at this point the bytes can be distributed to file, network, etc safely
-# where they could subsequently be deserialized on the receiving end.
-substrait_bytes = ss.substrait.serde.serialize_bytes("SELECT * FROM aggregate_test_data", ctx)
+- [Serialize query plans using Substrait](./examples/substrait.py)
 
-# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused
-# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
-substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes)
+### Executing SQL against DataFrame Libraries (Experimental)
 
-# type(df_logical_plan) -> <class 'substrait.LogicalPlan'>
-df_logical_plan = ss.substrait.consumer.from_substrait_plan(ctx, substrait_plan)
-
-# Back to Substrait Plan just for demonstration purposes
-# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
-substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan)
-
-```
+- [Executing SQL on Polars](./examples/sql-on-polars.py)
+- [Executing SQL on Pandas](./examples/sql-on-pandas.py)
 
 ## How to install (from pip)
 
diff --git a/conda/environments/datafusion-dev.yaml b/conda/environments/datafusion-dev.yaml
index 0e17e16..d9405e4 100644
--- a/conda/environments/datafusion-dev.yaml
+++ b/conda/environments/datafusion-dev.yaml
@@ -28,7 +28,7 @@
 - pytest
 - toml
 - importlib_metadata
-- python>=3.7,<3.11
+- python>=3.10
 # Packages useful for building distributions and releasing
 - mamba
 - conda-build
@@ -38,4 +38,7 @@
 - pydata-sphinx-theme==0.8.0
 - myst-parser
 - jinja2
+# GPU packages
+- cudf
+- cudatoolkit=11.8
 name: datafusion-dev
diff --git a/datafusion/__init__.py b/datafusion/__init__.py
index b6cd517..46206f0 100644
--- a/datafusion/__init__.py
+++ b/datafusion/__init__.py
@@ -41,8 +41,12 @@
 )
 
 from .expr import (
+    Analyze,
     Expr,
+    Filter,
+    Limit,
     Projection,
+    Sort,
     TableScan,
 )
 
@@ -63,6 +67,10 @@
     "Projection",
     "DFSchema",
     "DFField",
+    "Analyze",
+    "Sort",
+    "Limit",
+    "Filter",
 ]
 
 
diff --git a/datafusion/cudf.py b/datafusion/cudf.py
new file mode 100644
index 0000000..c38819c
--- /dev/null
+++ b/datafusion/cudf.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import cudf
+import datafusion
+from datafusion.expr import Projection, TableScan, Column
+
+
+class SessionContext:
+    def __init__(self):
+        self.datafusion_ctx = datafusion.SessionContext()
+        self.parquet_tables = {}
+
+    def register_parquet(self, name, path):
+        self.parquet_tables[name] = path
+        self.datafusion_ctx.register_parquet(name, path)
+
+    def to_cudf_expr(self, expr):
+
+        # get Python wrapper for logical expression
+        expr = expr.to_variant()
+
+        if isinstance(expr, Column):
+            return expr.name()
+        else:
+            raise Exception("unsupported expression: {}".format(expr))
+
+    def to_cudf_df(self, plan):
+        # recurse down first to translate inputs into pandas data frames
+        inputs = [self.to_cudf_df(x) for x in plan.inputs()]
+
+        # get Python wrapper for logical operator node
+        node = plan.to_variant()
+
+        if isinstance(node, Projection):
+            args = [self.to_cudf_expr(expr) for expr in node.projections()]
+            return inputs[0][args]
+        elif isinstance(node, TableScan):
+            return cudf.read_parquet(self.parquet_tables[node.table_name()])
+        else:
+            raise Exception(
+                "unsupported logical operator: {}".format(type(node))
+            )
+
+    def sql(self, sql):
+        datafusion_df = self.datafusion_ctx.sql(sql)
+        plan = datafusion_df.logical_plan()
+        return self.to_cudf_df(plan)
diff --git a/datafusion/pandas.py b/datafusion/pandas.py
new file mode 100644
index 0000000..f8e5651
--- /dev/null
+++ b/datafusion/pandas.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pandas as pd
+import datafusion
+from datafusion.expr import Projection, TableScan, Column
+
+
+class SessionContext:
+    def __init__(self):
+        self.datafusion_ctx = datafusion.SessionContext()
+        self.parquet_tables = {}
+
+    def register_parquet(self, name, path):
+        self.parquet_tables[name] = path
+        self.datafusion_ctx.register_parquet(name, path)
+
+    def to_pandas_expr(self, expr):
+        # get Python wrapper for logical expression
+        expr = expr.to_variant()
+
+        if isinstance(expr, Column):
+            return expr.name()
+        else:
+            raise Exception("unsupported expression: {}".format(expr))
+
+    def to_pandas_df(self, plan):
+        # recurse down first to translate inputs into pandas data frames
+        inputs = [self.to_pandas_df(x) for x in plan.inputs()]
+
+        # get Python wrapper for logical operator node
+        node = plan.to_variant()
+
+        if isinstance(node, Projection):
+            args = [self.to_pandas_expr(expr) for expr in node.projections()]
+            return inputs[0][args]
+        elif isinstance(node, TableScan):
+            return pd.read_parquet(self.parquet_tables[node.table_name()])
+        else:
+            raise Exception(
+                "unsupported logical operator: {}".format(type(node))
+            )
+
+    def sql(self, sql):
+        datafusion_df = self.datafusion_ctx.sql(sql)
+        plan = datafusion_df.logical_plan()
+        return self.to_pandas_df(plan)
diff --git a/datafusion/polars.py b/datafusion/polars.py
new file mode 100644
index 0000000..a1bafbe
--- /dev/null
+++ b/datafusion/polars.py
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import polars
+import datafusion
+from datafusion.expr import Projection, TableScan, Aggregate
+from datafusion.expr import Column, AggregateFunction
+
+
+class SessionContext:
+    def __init__(self):
+        self.datafusion_ctx = datafusion.SessionContext()
+        self.parquet_tables = {}
+
+    def register_parquet(self, name, path):
+        self.parquet_tables[name] = path
+        self.datafusion_ctx.register_parquet(name, path)
+
+    def to_polars_expr(self, expr):
+        # get Python wrapper for logical expression
+        expr = expr.to_variant()
+
+        if isinstance(expr, Column):
+            return polars.col(expr.name())
+        else:
+            raise Exception("unsupported expression: {}".format(expr))
+
+    def to_polars_df(self, plan):
+        # recurse down first to translate inputs into Polars data frames
+        inputs = [self.to_polars_df(x) for x in plan.inputs()]
+
+        # get Python wrapper for logical operator node
+        node = plan.to_variant()
+
+        if isinstance(node, Projection):
+            args = [self.to_polars_expr(expr) for expr in node.projections()]
+            return inputs[0].select(*args)
+        elif isinstance(node, Aggregate):
+            groupby_expr = [
+                self.to_polars_expr(expr) for expr in node.group_by_exprs()
+            ]
+            aggs = []
+            for expr in node.aggregate_exprs():
+                expr = expr.to_variant()
+                if isinstance(expr, AggregateFunction):
+                    if expr.aggregate_type() == "COUNT":
+                        aggs.append(polars.count().alias("{}".format(expr)))
+                    else:
+                        raise Exception(
+                            "Unsupported aggregate function {}".format(
+                                expr.aggregate_type()
+                            )
+                        )
+                else:
+                    raise Exception(
+                        "Unsupported aggregate function {}".format(expr)
+                    )
+            df = inputs[0].groupby(groupby_expr).agg(aggs)
+            return df
+        elif isinstance(node, TableScan):
+            return polars.read_parquet(self.parquet_tables[node.table_name()])
+        else:
+            raise Exception(
+                "unsupported logical operator: {}".format(type(node))
+            )
+
+    def sql(self, sql):
+        datafusion_df = self.datafusion_ctx.sql(sql)
+        plan = datafusion_df.logical_plan()
+        return self.to_polars_df(plan)
diff --git a/datafusion/tests/test_context.py b/datafusion/tests/test_context.py
index 6faffaf..efa2ede 100644
--- a/datafusion/tests/test_context.py
+++ b/datafusion/tests/test_context.py
@@ -35,7 +35,6 @@
 
 
 def test_create_context_with_all_valid_args():
-
     runtime = (
         RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
     )
diff --git a/datafusion/tests/test_dataframe.py b/datafusion/tests/test_dataframe.py
index 1894688..292a4b0 100644
--- a/datafusion/tests/test_dataframe.py
+++ b/datafusion/tests/test_dataframe.py
@@ -533,3 +533,14 @@
 def test_count(df):
     # Get number of rows
     assert df.count() == 3
+
+
+def test_to_pandas(df):
+    # Skip test if pandas is not installed
+    pd = pytest.importorskip("pandas")
+
+    # Convert datafusion dataframe to pandas dataframe
+    pandas_df = df.to_pandas()
+    assert type(pandas_df) == pd.DataFrame
+    assert pandas_df.shape == (3, 3)
+    assert set(pandas_df.columns) == {"a", "b", "c"}
diff --git a/datafusion/tests/test_expr.py b/datafusion/tests/test_expr.py
new file mode 100644
index 0000000..143eea6
--- /dev/null
+++ b/datafusion/tests/test_expr.py
@@ -0,0 +1,110 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import SessionContext
+from datafusion.expr import Column, Literal, BinaryExpr, AggregateFunction
+from datafusion.expr import (
+    Projection,
+    Filter,
+    Aggregate,
+    Limit,
+    Sort,
+    TableScan,
+)
+import pytest
+
+
+@pytest.fixture
+def test_ctx():
+    ctx = SessionContext()
+    ctx.register_csv("test", "testing/data/csv/aggregate_test_100.csv")
+    return ctx
+
+
+def test_projection(test_ctx):
+    df = test_ctx.sql("select c1, 123, c1 < 123 from test")
+    plan = df.logical_plan()
+
+    plan = plan.to_variant()
+    assert isinstance(plan, Projection)
+
+    expr = plan.projections()
+
+    col1 = expr[0].to_variant()
+    assert isinstance(col1, Column)
+    assert col1.name() == "c1"
+    assert col1.qualified_name() == "test.c1"
+
+    col2 = expr[1].to_variant()
+    assert isinstance(col2, Literal)
+    assert col2.data_type() == "Int64"
+    assert col2.value_i64() == 123
+
+    col3 = expr[2].to_variant()
+    assert isinstance(col3, BinaryExpr)
+    assert isinstance(col3.left().to_variant(), Column)
+    assert col3.op() == "<"
+    assert isinstance(col3.right().to_variant(), Literal)
+
+    plan = plan.input().to_variant()
+    assert isinstance(plan, TableScan)
+
+
+def test_filter(test_ctx):
+    df = test_ctx.sql("select c1 from test WHERE c1 > 5")
+    plan = df.logical_plan()
+
+    plan = plan.to_variant()
+    assert isinstance(plan, Projection)
+
+    plan = plan.input().to_variant()
+    assert isinstance(plan, Filter)
+
+
+def test_limit(test_ctx):
+    df = test_ctx.sql("select c1 from test LIMIT 10")
+    plan = df.logical_plan()
+
+    plan = plan.to_variant()
+    assert isinstance(plan, Limit)
+
+
+def test_aggregate_query(test_ctx):
+    df = test_ctx.sql("select c1, count(*) from test group by c1")
+    plan = df.logical_plan()
+
+    projection = plan.to_variant()
+    assert isinstance(projection, Projection)
+
+    aggregate = projection.input().to_variant()
+    assert isinstance(aggregate, Aggregate)
+
+    col1 = aggregate.group_by_exprs()[0].to_variant()
+    assert isinstance(col1, Column)
+    assert col1.name() == "c1"
+    assert col1.qualified_name() == "test.c1"
+
+    col2 = aggregate.aggregate_exprs()[0].to_variant()
+    assert isinstance(col2, AggregateFunction)
+
+
+def test_sort(test_ctx):
+    df = test_ctx.sql("select c1 from test order by c1")
+    plan = df.logical_plan()
+
+    plan = plan.to_variant()
+    assert isinstance(plan, Sort)
diff --git a/datafusion/tests/test_imports.py b/datafusion/tests/test_imports.py
index e5d9585..7eb8b7c 100644
--- a/datafusion/tests/test_imports.py
+++ b/datafusion/tests/test_imports.py
@@ -33,8 +33,17 @@
 
 from datafusion.expr import (
     Expr,
+    Column,
+    Literal,
+    BinaryExpr,
+    AggregateFunction,
     Projection,
     TableScan,
+    Filter,
+    Limit,
+    Aggregate,
+    Sort,
+    Analyze,
 )
 
 
@@ -55,9 +64,23 @@
     ]:
         assert klass.__module__ == "datafusion"
 
-    for klass in [Expr, Projection, TableScan]:
+    # expressions
+    for klass in [Expr, Column, Literal, BinaryExpr, AggregateFunction]:
         assert klass.__module__ == "datafusion.expr"
 
+    # operators
+    for klass in [
+        Projection,
+        TableScan,
+        Aggregate,
+        Sort,
+        Limit,
+        Filter,
+        Analyze,
+    ]:
+        assert klass.__module__ == "datafusion.expr"
+
+    # schema
     for klass in [DFField, DFSchema]:
         assert klass.__module__ == "datafusion.common"
 
diff --git a/dev/release/verify-release-candidate.sh b/dev/release/verify-release-candidate.sh
index fee276c..be86f69 100755
--- a/dev/release/verify-release-candidate.sh
+++ b/dev/release/verify-release-candidate.sh
@@ -125,15 +125,19 @@
   git clone https://github.com/apache/arrow-testing.git testing
   git clone https://github.com/apache/parquet-testing.git parquet-testing
 
-  cargo build
-  cargo test --all
+  python3 -m venv venv
+  source venv/bin/activate
+  python3 -m pip install -U pip
+  python3 -m pip install -r requirements-310.txt
+  maturin develop
+
+  #TODO: we should really run tests here as well
+  #python3 -m pytest
 
   if ( find -iname 'Cargo.toml' | xargs grep SNAPSHOT ); then
     echo "Cargo.toml version should not contain SNAPSHOT for releases"
     exit 1
   fi
-
-  cargo publish --dry-run
 }
 
 TEST_SUCCESS=no
diff --git a/examples/README.md b/examples/README.md
index a3ae0ba..ce98600 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -19,9 +19,31 @@
 
 # DataFusion Python Examples
 
-- [Query a Parquet file using SQL](./sql-parquet.py)
-- [Query a Parquet file using the DataFrame API](./dataframe-parquet.py)
-- [Run a SQL query and store the results in a Pandas DataFrame](./sql-to-pandas.py)
-- [Query PyArrow Data](./query-pyarrow-data.py)
-- [Register a Python UDF with DataFusion](./python-udf.py)
-- [Register a Python UDAF with DataFusion](./python-udaf.py)
+Some examples rely on data which can be downloaded from the following site:
+
+- https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
+
+Here is a direct link to the file used in the examples:
+
+- https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet
+
+### Executing Queries with DataFusion
+
+- [Query a Parquet file using SQL](./examples/sql-parquet.py)
+- [Query a Parquet file using the DataFrame API](./examples/dataframe-parquet.py)
+- [Run a SQL query and store the results in a Pandas DataFrame](./examples/sql-to-pandas.py)
+- [Query PyArrow Data](./examples/query-pyarrow-data.py)
+
+### Running User-Defined Python Code
+
+- [Register a Python UDF with DataFusion](./examples/python-udf.py)
+- [Register a Python UDAF with DataFusion](./examples/python-udaf.py)
+
+### Substrait Support
+
+- [Serialize query plans using Substrait](./examples/substrait.py)
+
+### Executing SQL against DataFrame Libraries (Experimental)
+
+- [Executing SQL on Polars](./examples/sql-on-polars.py)
+- [Executing SQL on Pandas](./examples/sql-on-pandas.py)
diff --git a/examples/dataframe-parquet.py b/examples/dataframe-parquet.py
index 31a8aa6..0f2e4b8 100644
--- a/examples/dataframe-parquet.py
+++ b/examples/dataframe-parquet.py
@@ -19,7 +19,7 @@
 from datafusion import functions as f
 
 ctx = SessionContext()
-df = ctx.read_parquet(
-    "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
-).aggregate([f.col("passenger_count")], [f.count_star()])
+df = ctx.read_parquet("yellow_tripdata_2021-01.parquet").aggregate(
+    [f.col("passenger_count")], [f.count_star()]
+)
 df.show()
diff --git a/examples/sql-on-cudf.py b/examples/sql-on-cudf.py
new file mode 100644
index 0000000..407cb1f
--- /dev/null
+++ b/examples/sql-on-cudf.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion.cudf import SessionContext
+
+
+ctx = SessionContext()
+ctx.register_parquet(
+    "taxi", "/home/jeremy/Downloads/yellow_tripdata_2021-01.parquet"
+)
+df = ctx.sql("select passenger_count from taxi")
+print(df)
diff --git a/examples/sql-on-pandas.py b/examples/sql-on-pandas.py
new file mode 100644
index 0000000..0efd776
--- /dev/null
+++ b/examples/sql-on-pandas.py
@@ -0,0 +1,24 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion.pandas import SessionContext
+
+
+ctx = SessionContext()
+ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
+df = ctx.sql("select passenger_count from taxi")
+print(df)
diff --git a/examples/sql-on-polars.py b/examples/sql-on-polars.py
new file mode 100644
index 0000000..c208114
--- /dev/null
+++ b/examples/sql-on-polars.py
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion.polars import SessionContext
+
+
+ctx = SessionContext()
+ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
+df = ctx.sql(
+    "select passenger_count, count(*) from taxi group by passenger_count"
+)
+print(df)
diff --git a/examples/sql-parquet.py b/examples/sql-parquet.py
index 7b2db6f..3cc9fbd 100644
--- a/examples/sql-parquet.py
+++ b/examples/sql-parquet.py
@@ -18,9 +18,7 @@
 from datafusion import SessionContext
 
 ctx = SessionContext()
-ctx.register_parquet(
-    "taxi", "/mnt/bigdata/nyctaxi/yellow/2021/yellow_tripdata_2021-01.parquet"
-)
+ctx.register_parquet("taxi", "yellow_tripdata_2021-01.parquet")
 df = ctx.sql(
     "select passenger_count, count(*) from taxi where passenger_count is not null group by passenger_count order by passenger_count"
 )
diff --git a/examples/sql-to-pandas.py b/examples/sql-to-pandas.py
index 3569e6d..3e99b22 100644
--- a/examples/sql-to-pandas.py
+++ b/examples/sql-to-pandas.py
@@ -33,17 +33,11 @@
     "order by passenger_count"
 )
 
-# collect as list of pyarrow.RecordBatch
-results = df.collect()
-
-# get first batch
-batch = results[0]
-
 # convert to Pandas
-df = batch.to_pandas()
+pandas_df = df.to_pandas()
 
 # create a chart
-fig = df.plot(
+fig = pandas_df.plot(
     kind="bar", title="Trip Count by Number of Passengers"
 ).get_figure()
 fig.savefig("chart.png")
diff --git a/examples/substrait.py b/examples/substrait.py
new file mode 100644
index 0000000..c167f7d
--- /dev/null
+++ b/examples/substrait.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from datafusion import SessionContext
+from datafusion import substrait as ss
+
+
+# Create a DataFusion context
+ctx = SessionContext()
+
+# Register table with context
+ctx.register_parquet(
+    "aggregate_test_data", "./testing/data/csv/aggregate_test_100.csv"
+)
+
+substrait_plan = ss.substrait.serde.serialize_to_plan(
+    "SELECT * FROM aggregate_test_data", ctx
+)
+# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
+
+# Alternative serialization approaches
+# type(substrait_bytes) -> <class 'list'>, at this point the bytes can be distributed to file, network, etc safely
+# where they could subsequently be deserialized on the receiving end.
+substrait_bytes = ss.substrait.serde.serialize_bytes(
+    "SELECT * FROM aggregate_test_data", ctx
+)
+
+# Imagine here bytes would be read from network, file, etc ... for example brevity this is omitted and variable is simply reused
+# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
+substrait_plan = ss.substrait.serde.deserialize_bytes(substrait_bytes)
+
+# type(df_logical_plan) -> <class 'substrait.LogicalPlan'>
+df_logical_plan = ss.substrait.consumer.from_substrait_plan(
+    ctx, substrait_plan
+)
+
+# Back to Substrait Plan just for demonstration purposes
+# type(substrait_plan) -> <class 'datafusion.substrait.plan'>
+substrait_plan = ss.substrait.producer.to_substrait_plan(df_logical_plan)
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 4b9fbca..a1c68dd 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -313,6 +313,24 @@
         Ok(())
     }
 
+    /// Convert to pandas dataframe with pyarrow
+    /// Collect the batches, pass to Arrow Table & then convert to Pandas DataFrame
+    fn to_pandas(&self, py: Python) -> PyResult<PyObject> {
+        let batches = self.collect(py);
+
+        Python::with_gil(|py| {
+            // Instantiate pyarrow Table object and use its from_batches method
+            let table_class = py.import("pyarrow")?.getattr("Table")?;
+            let args = PyTuple::new(py, batches);
+            let table: PyObject = table_class.call_method1("from_batches", args)?.into();
+
+            // Use Table.to_pandas() method to convert batches to pandas dataframe
+            // See also: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.to_pandas
+            let result = table.call_method0(py, "to_pandas")?;
+            Ok(result)
+        })
+    }
+
     // Executes this DataFrame to get the total number of rows.
     fn count(&self, py: Python) -> PyResult<usize> {
         Ok(wait_for_future(py, self.df.as_ref().clone().count())?)
diff --git a/src/expr.rs b/src/expr.rs
index f3695fe..90ce6bf 100644
--- a/src/expr.rs
+++ b/src/expr.rs
@@ -22,10 +22,24 @@
 use datafusion::arrow::pyarrow::PyArrowType;
 use datafusion_expr::{col, lit, Cast, Expr, GetIndexedField};
 
+use crate::errors::py_runtime_err;
+use crate::expr::aggregate_expr::PyAggregateFunction;
+use crate::expr::binary_expr::PyBinaryExpr;
+use crate::expr::column::PyColumn;
+use crate::expr::literal::PyLiteral;
 use datafusion::scalar::ScalarValue;
 
+pub mod aggregate;
+pub mod aggregate_expr;
+pub mod analyze;
+pub mod binary_expr;
+pub mod column;
+pub mod filter;
+pub mod limit;
+pub mod literal;
 pub mod logical_node;
 pub mod projection;
+pub mod sort;
 pub mod table_scan;
 
 /// A PyExpr that can be used on a DataFrame
@@ -49,6 +63,22 @@
 
 #[pymethods]
 impl PyExpr {
+    /// Return the specific expression
+    fn to_variant(&self, py: Python) -> PyResult<PyObject> {
+        Python::with_gil(|_| match &self.expr {
+            Expr::Column(col) => Ok(PyColumn::from(col.clone()).into_py(py)),
+            Expr::Literal(value) => Ok(PyLiteral::from(value.clone()).into_py(py)),
+            Expr::BinaryExpr(expr) => Ok(PyBinaryExpr::from(expr.clone()).into_py(py)),
+            Expr::AggregateFunction(expr) => {
+                Ok(PyAggregateFunction::from(expr.clone()).into_py(py))
+            }
+            other => Err(py_runtime_err(format!(
+                "Cannot convert this Expr to a Python object: {:?}",
+                other
+            ))),
+        })
+    }
+
     fn __richcmp__(&self, other: PyExpr, op: CompareOp) -> PyExpr {
         let expr = match op {
             CompareOp::Lt => self.expr.clone().lt(other.expr),
@@ -140,8 +170,20 @@
 
 /// Initializes the `expr` module to match the pattern of `datafusion-expr` https://docs.rs/datafusion-expr/latest/datafusion_expr/
 pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
+    // expressions
     m.add_class::<PyExpr>()?;
+    m.add_class::<PyColumn>()?;
+    m.add_class::<PyLiteral>()?;
+    m.add_class::<PyBinaryExpr>()?;
+    m.add_class::<PyLiteral>()?;
+    m.add_class::<PyAggregateFunction>()?;
+    // operators
     m.add_class::<table_scan::PyTableScan>()?;
     m.add_class::<projection::PyProjection>()?;
+    m.add_class::<filter::PyFilter>()?;
+    m.add_class::<limit::PyLimit>()?;
+    m.add_class::<aggregate::PyAggregate>()?;
+    m.add_class::<sort::PySort>()?;
+    m.add_class::<analyze::PyAnalyze>()?;
     Ok(())
 }
diff --git a/src/expr/aggregate.rs b/src/expr/aggregate.rs
new file mode 100644
index 0000000..98d1f55
--- /dev/null
+++ b/src/expr/aggregate.rs
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::DataFusionError;
+use datafusion_expr::logical_plan::Aggregate;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::expr::PyExpr;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Aggregate", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyAggregate {
+    aggregate: Aggregate,
+}
+
+impl From<Aggregate> for PyAggregate {
+    fn from(aggregate: Aggregate) -> PyAggregate {
+        PyAggregate { aggregate }
+    }
+}
+
+impl TryFrom<PyAggregate> for Aggregate {
+    type Error = DataFusionError;
+
+    fn try_from(agg: PyAggregate) -> Result<Self, Self::Error> {
+        Ok(agg.aggregate)
+    }
+}
+
+impl Display for PyAggregate {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(
+            f,
+            "Aggregate
+            \nGroupBy(s): {:?}
+            \nAggregates(s): {:?}
+            \nInput: {:?}
+            \nProjected Schema: {:?}",
+            &self.aggregate.group_expr,
+            &self.aggregate.aggr_expr,
+            self.aggregate.input,
+            self.aggregate.schema
+        )
+    }
+}
+
+#[pymethods]
+impl PyAggregate {
+    /// Retrieves the grouping expressions for this `Aggregate`
+    fn group_by_exprs(&self) -> PyResult<Vec<PyExpr>> {
+        Ok(self
+            .aggregate
+            .group_expr
+            .iter()
+            .map(|e| PyExpr::from(e.clone()))
+            .collect())
+    }
+
+    /// Retrieves the aggregate expressions for this `Aggregate`
+    fn aggregate_exprs(&self) -> PyResult<Vec<PyExpr>> {
+        Ok(self
+            .aggregate
+            .aggr_expr
+            .iter()
+            .map(|e| PyExpr::from(e.clone()))
+            .collect())
+    }
+
+    // Retrieves the input `LogicalPlan` to this `Aggregate` node
+    fn input(&self) -> PyLogicalPlan {
+        PyLogicalPlan::from((*self.aggregate.input).clone())
+    }
+
+    // Resulting Schema for this `Aggregate` node instance
+    fn schema(&self) -> PyDFSchema {
+        (*self.aggregate.schema).clone().into()
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("Aggregate({})", self))
+    }
+}
+
+impl LogicalNode for PyAggregate {
+    fn input(&self) -> Vec<PyLogicalPlan> {
+        vec![PyLogicalPlan::from((*self.aggregate.input).clone())]
+    }
+}
diff --git a/src/expr/aggregate_expr.rs b/src/expr/aggregate_expr.rs
new file mode 100644
index 0000000..1801051
--- /dev/null
+++ b/src/expr/aggregate_expr.rs
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::expr::PyExpr;
+use datafusion_expr::expr::AggregateFunction;
+use pyo3::prelude::*;
+use std::fmt::{Display, Formatter};
+
+#[pyclass(name = "AggregateFunction", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyAggregateFunction {
+    aggr: AggregateFunction,
+}
+
+impl From<PyAggregateFunction> for AggregateFunction {
+    fn from(aggr: PyAggregateFunction) -> Self {
+        aggr.aggr
+    }
+}
+
+impl From<AggregateFunction> for PyAggregateFunction {
+    fn from(aggr: AggregateFunction) -> PyAggregateFunction {
+        PyAggregateFunction { aggr }
+    }
+}
+
+impl Display for PyAggregateFunction {
+    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+        let args: Vec<String> = self.aggr.args.iter().map(|expr| expr.to_string()).collect();
+        write!(f, "{}({})", self.aggr.fun, args.join(", "))
+    }
+}
+
+#[pymethods]
+impl PyAggregateFunction {
+    /// Get the aggregate type, such as "MIN", or "MAX"
+    fn aggregate_type(&self) -> String {
+        format!("{}", self.aggr.fun)
+    }
+
+    /// is this a distinct aggregate such as `COUNT(DISTINCT expr)`
+    fn is_distinct(&self) -> bool {
+        self.aggr.distinct
+    }
+
+    /// Get the arguments to the aggregate function
+    fn args(&self) -> Vec<PyExpr> {
+        self.aggr
+            .args
+            .iter()
+            .map(|expr| PyExpr::from(expr.clone()))
+            .collect()
+    }
+
+    /// Get a String representation of this column
+    fn __repr__(&self) -> String {
+        format!("{}", self)
+    }
+}
diff --git a/src/expr/analyze.rs b/src/expr/analyze.rs
new file mode 100644
index 0000000..095fab0
--- /dev/null
+++ b/src/expr/analyze.rs
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::Analyze;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Analyze", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyAnalyze {
+    analyze: Analyze,
+}
+
+impl PyAnalyze {
+    pub fn new(analyze: Analyze) -> Self {
+        Self { analyze }
+    }
+}
+
+impl From<Analyze> for PyAnalyze {
+    fn from(analyze: Analyze) -> PyAnalyze {
+        PyAnalyze { analyze }
+    }
+}
+
+impl From<PyAnalyze> for Analyze {
+    fn from(analyze: PyAnalyze) -> Self {
+        analyze.analyze
+    }
+}
+
+impl Display for PyAnalyze {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(f, "Analyze Table")
+    }
+}
+
+#[pymethods]
+impl PyAnalyze {
+    fn verbose(&self) -> PyResult<bool> {
+        Ok(self.analyze.verbose)
+    }
+
+    /// Resulting Schema for this `Analyze` node instance
+    fn schema(&self) -> PyResult<PyDFSchema> {
+        Ok((*self.analyze.schema).clone().into())
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("Analyze({})", self))
+    }
+}
+
+impl LogicalNode for PyAnalyze {
+    fn input(&self) -> Vec<PyLogicalPlan> {
+        vec![PyLogicalPlan::from((*self.analyze.input).clone())]
+    }
+}
diff --git a/src/expr/binary_expr.rs b/src/expr/binary_expr.rs
new file mode 100644
index 0000000..5f382b7
--- /dev/null
+++ b/src/expr/binary_expr.rs
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::expr::PyExpr;
+use datafusion_expr::BinaryExpr;
+use pyo3::prelude::*;
+
+#[pyclass(name = "BinaryExpr", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyBinaryExpr {
+    expr: BinaryExpr,
+}
+
+impl From<PyBinaryExpr> for BinaryExpr {
+    fn from(expr: PyBinaryExpr) -> Self {
+        expr.expr
+    }
+}
+
+impl From<BinaryExpr> for PyBinaryExpr {
+    fn from(expr: BinaryExpr) -> PyBinaryExpr {
+        PyBinaryExpr { expr }
+    }
+}
+
+#[pymethods]
+impl PyBinaryExpr {
+    fn left(&self) -> PyExpr {
+        self.expr.left.as_ref().clone().into()
+    }
+
+    fn right(&self) -> PyExpr {
+        self.expr.right.as_ref().clone().into()
+    }
+
+    fn op(&self) -> String {
+        format!("{}", self.expr.op)
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("{}", self.expr))
+    }
+}
diff --git a/src/expr/column.rs b/src/expr/column.rs
new file mode 100644
index 0000000..16b8bce
--- /dev/null
+++ b/src/expr/column.rs
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::Column;
+use pyo3::prelude::*;
+
+#[pyclass(name = "Column", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyColumn {
+    pub col: Column,
+}
+
+impl PyColumn {
+    pub fn new(col: Column) -> Self {
+        Self { col }
+    }
+}
+
+impl From<Column> for PyColumn {
+    fn from(col: Column) -> PyColumn {
+        PyColumn { col }
+    }
+}
+
+#[pymethods]
+impl PyColumn {
+    /// Get the column name
+    fn name(&self) -> String {
+        self.col.name.clone()
+    }
+
+    /// Get the column relation
+    fn relation(&self) -> Option<String> {
+        self.col.relation.clone()
+    }
+
+    /// Get the fully-qualified column name
+    fn qualified_name(&self) -> String {
+        self.col.flat_name()
+    }
+
+    /// Get a String representation of this column
+    fn __repr__(&self) -> String {
+        self.qualified_name()
+    }
+}
diff --git a/src/expr/filter.rs b/src/expr/filter.rs
new file mode 100644
index 0000000..b7b48b9
--- /dev/null
+++ b/src/expr/filter.rs
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::Filter;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::expr::PyExpr;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Filter", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyFilter {
+    filter: Filter,
+}
+
+impl From<Filter> for PyFilter {
+    fn from(filter: Filter) -> PyFilter {
+        PyFilter { filter }
+    }
+}
+
+impl From<PyFilter> for Filter {
+    fn from(filter: PyFilter) -> Self {
+        filter.filter
+    }
+}
+
+impl Display for PyFilter {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(
+            f,
+            "Filter
+            \nPredicate: {:?}
+            \nInput: {:?}",
+            &self.filter.predicate, &self.filter.input
+        )
+    }
+}
+
+#[pymethods]
+impl PyFilter {
+    /// Retrieves the predicate expression for this `Filter`
+    fn predicate(&self) -> PyExpr {
+        PyExpr::from(self.filter.predicate.clone())
+    }
+
+    /// Retrieves the input `LogicalPlan` to this `Filter` node
+    fn input(&self) -> PyLogicalPlan {
+        PyLogicalPlan::from((*self.filter.input).clone())
+    }
+
+    /// Resulting Schema for this `Filter` node instance
+    fn schema(&self) -> PyDFSchema {
+        self.filter.input.schema().as_ref().clone().into()
+    }
+
+    fn __repr__(&self) -> String {
+        format!("Filter({})", self)
+    }
+}
+
+impl LogicalNode for PyFilter {
+    fn input(&self) -> Vec<PyLogicalPlan> {
+        vec![PyLogicalPlan::from((*self.filter.input).clone())]
+    }
+}
diff --git a/src/expr/limit.rs b/src/expr/limit.rs
new file mode 100644
index 0000000..a50e5b8
--- /dev/null
+++ b/src/expr/limit.rs
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_expr::logical_plan::Limit;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Limit", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyLimit {
+    limit: Limit,
+}
+
+impl From<Limit> for PyLimit {
+    fn from(limit: Limit) -> PyLimit {
+        PyLimit { limit }
+    }
+}
+
+impl From<PyLimit> for Limit {
+    fn from(limit: PyLimit) -> Self {
+        limit.limit
+    }
+}
+
+impl Display for PyLimit {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(
+            f,
+            "Limit
+            \nSkip: {}
+            \nFetch: {:?}
+            \nInput: {:?}",
+            &self.limit.skip, &self.limit.fetch, &self.limit.input
+        )
+    }
+}
+
+#[pymethods]
+impl PyLimit {
+    /// Retrieves the skip value for this `Limit`
+    fn skip(&self) -> usize {
+        self.limit.skip
+    }
+
+    /// Retrieves the fetch value for this `Limit`
+    fn fetch(&self) -> Option<usize> {
+        self.limit.fetch
+    }
+
+    /// Retrieves the input `LogicalPlan` to this `Limit` node
+    fn input(&self) -> PyLogicalPlan {
+        PyLogicalPlan::from((*self.limit.input).clone())
+    }
+
+    /// Resulting Schema for this `Limit` node instance
+    fn schema(&self) -> PyResult<PyDFSchema> {
+        Ok(self.limit.input.schema().as_ref().clone().into())
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("Limit({})", self))
+    }
+}
+
+impl LogicalNode for PyLimit {
+    fn input(&self) -> Vec<PyLogicalPlan> {
+        vec![PyLogicalPlan::from((*self.limit.input).clone())]
+    }
+}
diff --git a/src/expr/literal.rs b/src/expr/literal.rs
new file mode 100644
index 0000000..27674ce
--- /dev/null
+++ b/src/expr/literal.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::errors::py_runtime_err;
+use datafusion_common::ScalarValue;
+use pyo3::prelude::*;
+
+#[pyclass(name = "Literal", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PyLiteral {
+    pub value: ScalarValue,
+}
+
+impl From<PyLiteral> for ScalarValue {
+    fn from(lit: PyLiteral) -> ScalarValue {
+        lit.value
+    }
+}
+
+impl From<ScalarValue> for PyLiteral {
+    fn from(value: ScalarValue) -> PyLiteral {
+        PyLiteral { value }
+    }
+}
+
+#[pymethods]
+impl PyLiteral {
+    /// Get the data type of this literal value
+    fn data_type(&self) -> String {
+        format!("{}", self.value.get_datatype())
+    }
+
+    fn value_i32(&self) -> PyResult<i32> {
+        if let ScalarValue::Int32(Some(n)) = &self.value {
+            Ok(*n)
+        } else {
+            Err(py_runtime_err("Cannot access value as i32"))
+        }
+    }
+
+    fn value_i64(&self) -> PyResult<i64> {
+        if let ScalarValue::Int64(Some(n)) = &self.value {
+            Ok(*n)
+        } else {
+            Err(py_runtime_err("Cannot access value as i64"))
+        }
+    }
+
+    fn value_str(&self) -> PyResult<String> {
+        if let ScalarValue::Utf8(Some(str)) = &self.value {
+            Ok(str.clone())
+        } else {
+            Err(py_runtime_err("Cannot access value as string"))
+        }
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("{}", self.value))
+    }
+}
diff --git a/src/expr/projection.rs b/src/expr/projection.rs
index 6d04e59..4c158f7 100644
--- a/src/expr/projection.rs
+++ b/src/expr/projection.rs
@@ -15,13 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use datafusion_common::DataFusionError;
 use datafusion_expr::logical_plan::Projection;
 use pyo3::prelude::*;
 use std::fmt::{self, Display, Formatter};
 
 use crate::common::df_schema::PyDFSchema;
-use crate::errors::py_runtime_err;
 use crate::expr::logical_node::LogicalNode;
 use crate::expr::PyExpr;
 use crate::sql::logical::PyLogicalPlan;
@@ -32,21 +30,21 @@
     projection: Projection,
 }
 
+impl PyProjection {
+    pub fn new(projection: Projection) -> Self {
+        Self { projection }
+    }
+}
+
 impl From<Projection> for PyProjection {
     fn from(projection: Projection) -> PyProjection {
         PyProjection { projection }
     }
 }
 
-impl TryFrom<PyProjection> for Projection {
-    type Error = DataFusionError;
-
-    fn try_from(py_proj: PyProjection) -> Result<Self, Self::Error> {
-        Projection::try_new_with_schema(
-            py_proj.projection.expr,
-            py_proj.projection.input.clone(),
-            py_proj.projection.schema,
-        )
+impl From<PyProjection> for Projection {
+    fn from(proj: PyProjection) -> Self {
+        proj.projection
     }
 }
 
@@ -66,8 +64,7 @@
 #[pymethods]
 impl PyProjection {
     /// Retrieves the expressions for this `Projection`
-    #[pyo3(name = "projections")]
-    fn py_projections(&self) -> PyResult<Vec<PyExpr>> {
+    fn projections(&self) -> PyResult<Vec<PyExpr>> {
         Ok(self
             .projection
             .expr
@@ -76,25 +73,13 @@
             .collect())
     }
 
-    // Retrieves the input `LogicalPlan` to this `Projection` node
-    #[pyo3(name = "input")]
-    fn py_input(&self) -> PyResult<PyLogicalPlan> {
-        // DataFusion make a loose guarantee that each Projection should have an input, however
-        // we check for that hear since we are performing explicit index retrieval
-        let inputs = LogicalNode::input(self);
-        if !inputs.is_empty() {
-            return Ok(inputs[0].clone());
-        }
-
-        Err(py_runtime_err(format!(
-            "Expected `input` field for Projection node: {}",
-            self
-        )))
+    /// Retrieves the input `LogicalPlan` to this `Projection` node
+    fn input(&self) -> PyLogicalPlan {
+        PyLogicalPlan::from((*self.projection.input).clone())
     }
 
-    // Resulting Schema for this `Projection` node instance
-    #[pyo3(name = "schema")]
-    fn py_schema(&self) -> PyResult<PyDFSchema> {
+    /// Resulting Schema for this `Projection` node instance
+    fn schema(&self) -> PyResult<PyDFSchema> {
         Ok((*self.projection.schema).clone().into())
     }
 
diff --git a/src/expr/sort.rs b/src/expr/sort.rs
new file mode 100644
index 0000000..1d0a7f6
--- /dev/null
+++ b/src/expr/sort.rs
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::DataFusionError;
+use datafusion_expr::logical_plan::Sort;
+use pyo3::prelude::*;
+use std::fmt::{self, Display, Formatter};
+
+use crate::common::df_schema::PyDFSchema;
+use crate::expr::logical_node::LogicalNode;
+use crate::expr::PyExpr;
+use crate::sql::logical::PyLogicalPlan;
+
+#[pyclass(name = "Sort", module = "datafusion.expr", subclass)]
+#[derive(Clone)]
+pub struct PySort {
+    sort: Sort,
+}
+
+impl From<Sort> for PySort {
+    fn from(sort: Sort) -> PySort {
+        PySort { sort }
+    }
+}
+
+impl TryFrom<PySort> for Sort {
+    type Error = DataFusionError;
+
+    fn try_from(agg: PySort) -> Result<Self, Self::Error> {
+        Ok(agg.sort)
+    }
+}
+
+impl Display for PySort {
+    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+        write!(
+            f,
+            "Sort
+            \nExpr(s): {:?}
+            \nInput: {:?}
+            \nSchema: {:?}",
+            &self.sort.expr,
+            self.sort.input,
+            self.sort.input.schema()
+        )
+    }
+}
+
+#[pymethods]
+impl PySort {
+    /// Retrieves the sort expressions for this `Sort`
+    fn sort_exprs(&self) -> PyResult<Vec<PyExpr>> {
+        Ok(self
+            .sort
+            .expr
+            .iter()
+            .map(|e| PyExpr::from(e.clone()))
+            .collect())
+    }
+
+    /// Retrieves the input `LogicalPlan` to this `Sort` node
+    fn input(&self) -> PyLogicalPlan {
+        PyLogicalPlan::from((*self.sort.input).clone())
+    }
+
+    /// Resulting Schema for this `Sort` node instance
+    fn schema(&self) -> PyDFSchema {
+        self.sort.input.schema().as_ref().clone().into()
+    }
+
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("Sort({})", self))
+    }
+}
+
+impl LogicalNode for PySort {
+    fn input(&self) -> Vec<PyLogicalPlan> {
+        vec![PyLogicalPlan::from((*self.sort.input).clone())]
+    }
+}
diff --git a/src/expr/table_scan.rs b/src/expr/table_scan.rs
index 00504b9..2784523 100644
--- a/src/expr/table_scan.rs
+++ b/src/expr/table_scan.rs
@@ -19,6 +19,8 @@
 use pyo3::prelude::*;
 use std::fmt::{self, Display, Formatter};
 
+use crate::expr::logical_node::LogicalNode;
+use crate::sql::logical::PyLogicalPlan;
 use crate::{common::df_schema::PyDFSchema, expr::PyExpr};
 
 #[pyclass(name = "TableScan", module = "datafusion.expr", subclass)]
@@ -27,6 +29,12 @@
     table_scan: TableScan,
 }
 
+impl PyTableScan {
+    pub fn new(table_scan: TableScan) -> Self {
+        Self { table_scan }
+    }
+}
+
 impl From<PyTableScan> for TableScan {
     fn from(tbl_scan: PyTableScan) -> TableScan {
         tbl_scan.table_scan
@@ -117,3 +125,10 @@
         Ok(format!("TableScan({})", self))
     }
 }
+
+impl LogicalNode for PyTableScan {
+    fn input(&self) -> Vec<PyLogicalPlan> {
+        // table scans are leaf nodes and do not have inputs
+        vec![]
+    }
+}
diff --git a/src/sql/logical.rs b/src/sql/logical.rs
index dcd7baa..ee48f1e 100644
--- a/src/sql/logical.rs
+++ b/src/sql/logical.rs
@@ -17,6 +17,14 @@
 
 use std::sync::Arc;
 
+use crate::errors::py_runtime_err;
+use crate::expr::aggregate::PyAggregate;
+use crate::expr::analyze::PyAnalyze;
+use crate::expr::filter::PyFilter;
+use crate::expr::limit::PyLimit;
+use crate::expr::projection::PyProjection;
+use crate::expr::sort::PySort;
+use crate::expr::table_scan::PyTableScan;
 use datafusion_expr::LogicalPlan;
 use pyo3::prelude::*;
 
@@ -33,12 +41,33 @@
             plan: Arc::new(plan),
         }
     }
+
+    pub fn plan(&self) -> Arc<LogicalPlan> {
+        self.plan.clone()
+    }
 }
 
 #[pymethods]
 impl PyLogicalPlan {
+    /// Return the specific logical operator
+    fn to_variant(&self, py: Python) -> PyResult<PyObject> {
+        Python::with_gil(|_| match self.plan.as_ref() {
+            LogicalPlan::Aggregate(plan) => Ok(PyAggregate::from(plan.clone()).into_py(py)),
+            LogicalPlan::Analyze(plan) => Ok(PyAnalyze::from(plan.clone()).into_py(py)),
+            LogicalPlan::Filter(plan) => Ok(PyFilter::from(plan.clone()).into_py(py)),
+            LogicalPlan::Limit(plan) => Ok(PyLimit::from(plan.clone()).into_py(py)),
+            LogicalPlan::Projection(plan) => Ok(PyProjection::from(plan.clone()).into_py(py)),
+            LogicalPlan::Sort(plan) => Ok(PySort::from(plan.clone()).into_py(py)),
+            LogicalPlan::TableScan(plan) => Ok(PyTableScan::from(plan.clone()).into_py(py)),
+            other => Err(py_runtime_err(format!(
+                "Cannot convert this plan to a LogicalNode: {:?}",
+                other
+            ))),
+        })
+    }
+
     /// Get the inputs to this plan
-    pub fn inputs(&self) -> Vec<PyLogicalPlan> {
+    fn inputs(&self) -> Vec<PyLogicalPlan> {
         let mut inputs = vec![];
         for input in self.plan.inputs() {
             inputs.push(input.to_owned().into());
@@ -46,19 +75,23 @@
         inputs
     }
 
-    pub fn display(&self) -> String {
+    fn __repr__(&self) -> PyResult<String> {
+        Ok(format!("{:?}", self.plan))
+    }
+
+    fn display(&self) -> String {
         format!("{}", self.plan.display())
     }
 
-    pub fn display_indent(&self) -> String {
+    fn display_indent(&self) -> String {
         format!("{}", self.plan.display_indent())
     }
 
-    pub fn display_indent_schema(&self) -> String {
+    fn display_indent_schema(&self) -> String {
         format!("{}", self.plan.display_indent_schema())
     }
 
-    pub fn display_graphviz(&self) -> String {
+    fn display_graphviz(&self) -> String {
         format!("{}", self.plan.display_indent_schema())
     }
 }