Fix Python UDAF list-of-timestamps return by enforcing list-valued scalars and caching PyArrow types (#1347)

* Implement UDAF improvements for list type handling

Store UDAF return type in Rust accumulator and wrap
pyarrow Array/ChunkedArray returns into list scalars
for list-like return types. Add a UDAF test to return
a list of timestamps via a pyarrow array, validating
the aggregate output for correctness.

* Document UDAF list-valued scalar returns

Add documented list-valued scalar returns for UDAF
accumulators, including an example with pa.scalar and a note
about unsupported pyarrow.Array returns from evaluate().
Also, introduce a UDAF FAQ entry detailing list-returning
patterns and required return_type/state_type definitions.

* Fix pyarrow calls and improve type handling in RustAccumulator

* Refactor RustAccumulator to support pyarrow array types and improve type checking for list types

* Fixed PyO3 type mismatch by cloning Array/ChunkedArray types before unbinding and binding fresh copies when checking array-likeness, eliminating the Bound reference error

* Add timezone information to datetime objects in test_udaf_list_timestamp_return

* clippy fix

* Refactor RustAccumulator and utility functions for improved type handling and conversion from Python objects to Arrow types

* Enhance PyArrow integration by refining type handling and conversion in RustAccumulator and utility functions

* Fix array data binding in py_obj_to_scalar_value function

* Implement single point for scalar conversion from python objects

* Add unit tests and simplify python wrapper for literal

* Add nanoarrow and arro3-core to dev dependencies. Sort the dependencies alphabetically.

* Refactor common code into helper function so we do not duplicate it.

* Update import path to access Scalar type

* Add test for generic python objects that support the C interface

* Update unit test to pass back either pyarrow array or array wrapped as scalar

* Update tests to pass back raw python values or pyarrow scalar

* Expand on user documentation for how to return list arrays

* More user documentation

---------

Co-authored-by: Tim Saucer <timsaucer@gmail.com>
13 files changed
tree: 94eb6e04f3b121ae20a4d6c021046bb101da6245
  1. .cargo/
  2. .github/
  3. benchmarks/
  4. ci/
  5. dev/
  6. docs/
  7. examples/
  8. python/
  9. src/
  10. .asf.yaml
  11. .dockerignore
  12. .gitignore
  13. .gitmodules
  14. .pre-commit-config.yaml
  15. build.rs
  16. Cargo.lock
  17. Cargo.toml
  18. CHANGELOG.md
  19. LICENSE.txt
  20. pyproject.toml
  21. README.md
  22. rustfmt.toml
  23. uv.lock
README.md

DataFusion in Python

Python test Python Release Build

This is a Python library that binds to Apache Arrow in-memory query engine DataFusion.

DataFusion's Python bindings can be used as a foundation for building new data systems in Python. Here are some examples:

  • Dask SQL uses DataFusion's Python bindings for SQL parsing, query planning, and logical plan optimizations, and then transpiles the logical plan to Dask operations for execution.
  • DataFusion Ballista is a distributed SQL query engine that extends DataFusion's Python bindings for distributed use cases.
  • DataFusion Ray is another distributed query engine that uses DataFusion's Python bindings.

Features

  • Execute queries using SQL or DataFrames against CSV, Parquet, and JSON data sources.
  • Queries are optimized using DataFusion's query optimizer.
  • Execute user-defined Python code from SQL.
  • Exchange data with Pandas and other DataFrame libraries that support PyArrow.
  • Serialize and deserialize query plans in Substrait format.
  • Experimental support for transpiling SQL queries to DataFrame calls with Polars, Pandas, and cuDF.

For tips on tuning parallelism, see Maximizing CPU Usage in the configuration guide.

Example Usage

The following example demonstrates running a SQL query against a Parquet file using DataFusion, storing the results in a Pandas DataFrame, and then plotting a chart.

The Parquet file used in this example can be downloaded from the following page:

from datafusion import SessionContext

# Create a DataFusion context
ctx = SessionContext()

# Register table with context
ctx.register_parquet('taxi', 'yellow_tripdata_2021-01.parquet')

# Execute SQL
df = ctx.sql("select passenger_count, count(*) "
             "from taxi "
             "where passenger_count is not null "
             "group by passenger_count "
             "order by passenger_count")

# convert to Pandas
pandas_df = df.to_pandas()

# create a chart
fig = pandas_df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure()
fig.savefig('chart.png')

This produces the following chart:

Chart

Registering a DataFrame as a View

You can use SessionContext's register_view method to convert a DataFrame into a view and register it with the context.

from datafusion import SessionContext, col, literal

# Create a DataFusion context
ctx = SessionContext()

# Create sample data
data = {"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]}

# Create a DataFrame from the dictionary
df = ctx.from_pydict(data, "my_table")

# Filter the DataFrame (for example, keep rows where a > 2)
df_filtered = df.filter(col("a") > literal(2))

# Register the dataframe as a view with the context
ctx.register_view("view1", df_filtered)

# Now run a SQL query against the registered view
df_view = ctx.sql("SELECT * FROM view1")

# Collect the results
results = df_view.collect()

# Convert results to a list of dictionaries for display
result_dicts = [batch.to_pydict() for batch in results]

print(result_dicts)

This will output:

[{'a': [3, 4, 5], 'b': [30, 40, 50]}]

Configuration

It is possible to configure runtime (memory and disk settings) and configuration settings when creating a context.

runtime = (
    RuntimeEnvBuilder()
    .with_disk_manager_os()
    .with_fair_spill_pool(10000000)
)
config = (
    SessionConfig()
    .with_create_default_catalog_and_schema(True)
    .with_default_catalog_and_schema("foo", "bar")
    .with_target_partitions(8)
    .with_information_schema(True)
    .with_repartition_joins(False)
    .with_repartition_aggregations(False)
    .with_repartition_windows(False)
    .with_parquet_pruning(False)
    .set("datafusion.execution.parquet.pushdown_filters", "true")
)
ctx = SessionContext(config, runtime)

Refer to the API documentation for more information.

Printing the context will show the current configuration settings.

print(ctx)

Extensions

For information about how to extend DataFusion Python, please see the extensions page of the online documentation.

More Examples

See examples for more information.

Executing Queries with DataFusion

Running User-Defined Python Code

Substrait Support

How to install

uv

uv add datafusion

Pip

pip install datafusion
# or
python -m pip install datafusion

Conda

conda install -c conda-forge datafusion

You can verify the installation by running:

>>> import datafusion
>>> datafusion.__version__
'0.6.0'

How to develop

This assumes that you have rust and cargo installed. We use the workflow recommended by pyo3 and maturin. The Maturin tools used in this workflow can be installed either via uv or pip. Both approaches should offer the same experience. It is recommended to use uv since it has significant performance improvements over pip.

Currently for protobuf support either protobuf or cmake must be installed.

Bootstrap (uv):

By default uv will attempt to build the datafusion python package. For our development we prefer to build manually. This means that when creating your virtual environment using uv sync you need to pass in the additional --no-install-package datafusion and for uv run commands the additional parameter --no-project

# fetch this repo
git clone git@github.com:apache/datafusion-python.git
# cd to the repo root
cd datafusion-python/
# create the virtual environment
uv sync --dev --no-install-package datafusion
# activate the environment
source .venv/bin/activate

Bootstrap (pip):

# fetch this repo
git clone git@github.com:apache/datafusion-python.git
# cd to the repo root
cd datafusion-python/
# prepare development environment (used to build wheel / install in development)
python3 -m venv .venv
# activate the venv
source .venv/bin/activate
# update pip itself if necessary
python -m pip install -U pip
# install dependencies
python -m pip install -r pyproject.toml

The tests rely on test data in git submodules.

git submodule update --init

Whenever rust code changes (your changes or via git pull):

# make sure you activate the venv using "source venv/bin/activate" first
maturin develop --uv
python -m pytest

Alternatively if you are using uv you can do the following without needing to activate the virtual environment:

uv run --no-project maturin develop --uv
uv run --no-project pytest .

Running & Installing pre-commit hooks

datafusion-python takes advantage of pre-commit to assist developers with code linting to help reduce the number of commits that ultimately fail in CI due to linter errors. Using the pre-commit hooks is optional for the developer but certainly helpful for keeping PRs clean and concise.

Our pre-commit hooks can be installed by running pre-commit install, which will install the configurations in your DATAFUSION_PYTHON_ROOT/.github directory and run each time you perform a commit, failing to complete the commit if an offending lint is found allowing you to make changes locally before pushing.

The pre-commit hooks can also be run adhoc without installing them by simply running pre-commit run --all-files.

NOTE: the current pre-commit hooks require docker, and cmake. See note on protobuf above.

Running linters without using pre-commit

There are scripts in ci/scripts for running Rust and Python linters.

./ci/scripts/python_lint.sh
./ci/scripts/rust_clippy.sh
./ci/scripts/rust_fmt.sh
./ci/scripts/rust_toml_fmt.sh

How to update dependencies

To change test dependencies, change the pyproject.toml and run

uv sync --dev --no-install-package datafusion