Add Interruptible Query Execution in Jupyter via KeyboardInterrupt Support (#1141) * fix: enhance error handling in async wait_for_future function * feat: implement async execution for execution plans in PySessionContext * fix: improve error message for execution failures in PySessionContext * fix: enhance error handling and improve execution plan retrieval in PyDataFrame * fix: ensure 'static lifetime for futures in wait_for_future function * fix: handle potential errors when caching DataFrame and retrieving execution plan * fix: flatten batches in PyDataFrame to ensure proper schema conversion * fix: correct error handling in batch processing for schema conversion * fix: flatten nested structure in PyDataFrame to ensure proper RecordBatch iteration * fix: improve error handling in PyDataFrame stream execution * fix: add utility to get Tokio Runtime with time enabled and update wait_for_future to use it * fix: store result of converting RecordBatches to PyArrow for debugging * fix: handle error from wait_for_future in PyDataFrame collect method * fix: propagate error from wait_for_future in collect_partitioned method * fix: enable IO in Tokio runtime with time support * main register_listing_table * Revert "main register_listing_table" This reverts commit 52a5efe2001455a3ad881968d468e5c7538e1ced. * fix: propagate error correctly from wait_for_future in PySessionContext methods * fix: simplify error handling in PySessionContext by unwrapping wait_for_future result * test: add interruption handling test for long-running queries in DataFusion * test: move test_collect_interrupted to test_dataframe.py * fix: add const for interval in wait_for_future utility * fix: use get_tokio_runtime instead of the custom get_runtime * Revert "fix: use get_tokio_runtime instead of the custom get_runtime" This reverts commit ca2d89289d0a702bbb38f34e88fb78ad61d20647. * fix: use get_tokio_runtime instead of the custom get_runtime * . * Revert "." This reverts commit b8ce3e446b74aac7a76f1cc8ce6501b453d4f13c. * fix: improve query interruption handling in test_collect_interrupted * fix: ensure proper handling of query interruption in test_collect_interrupted * fix: improve error handling in database table retrieval * refactor: add helper for async move * Revert "refactor: add helper for async move" This reverts commit faabf6dd90ac505934e7cb6dc3b69fddbe89e661. * move py_err_to_datafusion_err to errors.rs * add create_csv_read_options * fix * create_csv_read_options -> PyDataFusionResult * revert to before create_csv_read_options * refactor: simplify file compression type parsing in PySessionContext * fix: parse_compression_type once only * add create_ndjson_read_options * refactor comment for clarity in wait_for_future function * refactor wait_for_future to avoid spawn * remove unused py_err_to_datafusion_err function * add comment to clarify error handling in next method of PyRecordBatchStream * handle error from wait_for_future in PySubstraitSerializer * clarify comment on future pinning in wait_for_future function * refactor wait_for_future to use Duration for signal check interval * handle error from wait_for_future in count method of PyDataFrame * fix ruff errors * fix clippy errors * remove unused get_and_enter_tokio_runtime function and simplify wait_for_future * Refactor async handling in PySessionContext and PyDataFrame - Simplified async handling by removing unnecessary cloning of strings and context in various methods. - Streamlined the use of `wait_for_future` to directly handle futures without intermediate variables. - Improved error handling by directly propagating results from futures. - Enhanced readability by reducing boilerplate code in methods related to reading and writing data. - Updated the `wait_for_future` function to improve signal checking and future handling. * Organize imports in utils.rs for improved readability * map_err instead of panic * Fix error handling in async stream execution for PySessionContext and PyDataFrame
This is a Python library that binds to Apache Arrow in-memory query engine DataFusion.
DataFusion's Python bindings can be used as a foundation for building new data systems in Python. Here are some examples:
The following example demonstrates running a SQL query against a Parquet file using DataFusion, storing the results in a Pandas DataFrame, and then plotting a chart.
The Parquet file used in this example can be downloaded from the following page:
from datafusion import SessionContext # Create a DataFusion context ctx = SessionContext() # Register table with context ctx.register_parquet('taxi', 'yellow_tripdata_2021-01.parquet') # Execute SQL df = ctx.sql("select passenger_count, count(*) " "from taxi " "where passenger_count is not null " "group by passenger_count " "order by passenger_count") # convert to Pandas pandas_df = df.to_pandas() # create a chart fig = pandas_df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure() fig.savefig('chart.png')
This produces the following chart:
You can use SessionContext's register_view method to convert a DataFrame into a view and register it with the context.
from datafusion import SessionContext, col, literal # Create a DataFusion context ctx = SessionContext() # Create sample data data = {"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]} # Create a DataFrame from the dictionary df = ctx.from_pydict(data, "my_table") # Filter the DataFrame (for example, keep rows where a > 2) df_filtered = df.filter(col("a") > literal(2)) # Register the dataframe as a view with the context ctx.register_view("view1", df_filtered) # Now run a SQL query against the registered view df_view = ctx.sql("SELECT * FROM view1") # Collect the results results = df_view.collect() # Convert results to a list of dictionaries for display result_dicts = [batch.to_pydict() for batch in results] print(result_dicts)
This will output:
[{'a': [3, 4, 5], 'b': [30, 40, 50]}]
It is possible to configure runtime (memory and disk settings) and configuration settings when creating a context.
runtime = ( RuntimeEnvBuilder() .with_disk_manager_os() .with_fair_spill_pool(10000000) ) config = ( SessionConfig() .with_create_default_catalog_and_schema(True) .with_default_catalog_and_schema("foo", "bar") .with_target_partitions(8) .with_information_schema(True) .with_repartition_joins(False) .with_repartition_aggregations(False) .with_repartition_windows(False) .with_parquet_pruning(False) .set("datafusion.execution.parquet.pushdown_filters", "true") ) ctx = SessionContext(config, runtime)
Refer to the API documentation for more information.
Printing the context will show the current configuration settings.
print(ctx)
For information about how to extend DataFusion Python, please see the extensions page of the online documentation.
See examples for more information.
uv add datafusion
pip install datafusion # or python -m pip install datafusion
conda install -c conda-forge datafusion
You can verify the installation by running:
>>> import datafusion >>> datafusion.__version__ '0.6.0'
This assumes that you have rust and cargo installed. We use the workflow recommended by pyo3 and maturin. The Maturin tools used in this workflow can be installed either via uv or pip. Both approaches should offer the same experience. It is recommended to use uv since it has significant performance improvements over pip.
Bootstrap (uv):
By default uv will attempt to build the datafusion python package. For our development we prefer to build manually. This means that when creating your virtual environment using uv sync you need to pass in the additional --no-install-package datafusion and for uv run commands the additional parameter --no-project
# fetch this repo git clone git@github.com:apache/datafusion-python.git # create the virtual enviornment uv sync --dev --no-install-package datafusion # activate the environment source .venv/bin/activate
Bootstrap (pip):
# fetch this repo git clone git@github.com:apache/datafusion-python.git # prepare development environment (used to build wheel / install in development) python3 -m venv .venv # activate the venv source .venv/bin/activate # update pip itself if necessary python -m pip install -U pip # install dependencies python -m pip install -r pyproject.toml
The tests rely on test data in git submodules.
git submodule update --init
Whenever rust code changes (your changes or via git pull):
# make sure you activate the venv using "source venv/bin/activate" first maturin develop --uv python -m pytest
Alternatively if you are using uv you can do the following without needing to activate the virtual environment:
uv run --no-project maturin develop --uv uv --no-project pytest .
datafusion-python takes advantage of pre-commit to assist developers with code linting to help reduce the number of commits that ultimately fail in CI due to linter errors. Using the pre-commit hooks is optional for the developer but certainly helpful for keeping PRs clean and concise.
Our pre-commit hooks can be installed by running pre-commit install, which will install the configurations in your DATAFUSION_PYTHON_ROOT/.github directory and run each time you perform a commit, failing to complete the commit if an offending lint is found allowing you to make changes locally before pushing.
The pre-commit hooks can also be run adhoc without installing them by simply running pre-commit run --all-files
There are scripts in ci/scripts for running Rust and Python linters.
./ci/scripts/python_lint.sh ./ci/scripts/rust_clippy.sh ./ci/scripts/rust_fmt.sh ./ci/scripts/rust_toml_fmt.sh
To change test dependencies, change the pyproject.toml and run
uv sync --dev --no-install-package datafusion