Add support for logical and physical codecs (#1541) * feat: unify logical + physical proto codec stack via SessionContext Introduces a single composable codec layer that every serializer reads from the session, replacing the hardcoded `DefaultLogicalExtensionCodec` / `DefaultPhysicalExtensionCodec` calls scattered across PyLogicalPlan, PyExecutionPlan, and the Rust-wrapped Python provider plumbing. Key changes: * New `PythonLogicalCodec` and `PythonPhysicalCodec` (crates/core/src/codec.rs) wrap any inner `LogicalExtensionCodec` / `PhysicalExtensionCodec`. Both share a `DFPYUDF1` magic-prefix path for in-band cloudpickle encoding of Python scalar UDFs, so an `ExecutionPlan` / `PhysicalExpr` referencing a Python `ScalarUDF` round-trips through either layer. Magic-prefix registry table (DFPYUDF1 in use; DFPYUDA1 / DFPYUDW1 / DFPYPE1 reserved) documented in the module header. * `PySessionContext` stores `Arc<PythonLogicalCodec>` and `Arc<PythonPhysicalCodec>` directly. FFI wrappers are built on demand via `ffi_logical_codec()` / `ffi_physical_codec()` for capsule export and downstream `RustWrappedPy*` consumers. Adds `__datafusion_physical_extension_codec__` getter + `with_physical_extension_codec` setter (symmetric with the logical pair). * `PyLogicalPlan.to_proto` / `from_proto` renamed to `to_bytes` / `from_bytes`, now reading the session's logical codec. `to_proto` / `from_proto` survive as deprecated thin wrappers emitting `DeprecationWarning`. * `PyExecutionPlan` gains the same `to_bytes` / `from_bytes` rename + deprecated aliases, plus `__datafusion_execution_plan__` capsule getter and `from_pycapsule` (ported from poc_ffi_query_planner). * New `PyPhysicalExpr` class with `to_bytes` / `from_bytes` / `from_pycapsule` / `__datafusion_physical_expr__`. `from_bytes` takes an input pyarrow Schema for column-reference resolution. * `datafusion-python-util` gains `from_pycapsule!` / `try_from_pycapsule!` macros + `physical_codec_from_pycapsule`, `task_context_from_pycapsule`, `create_physical_extension_capsule` (ported from poc_ffi_query_planner). * `PythonFunctionScalarUDF` exposes `func()`, `input_fields()`, `return_field()`, `volatility()`, `from_parts()` accessors needed by the codec. Python wrapper updates: `LogicalPlan` / `ExecutionPlan` add `to_bytes` / `from_bytes` + deprecate `to_proto` / `from_proto`; `ExecutionPlan` adds capsule getter + `from_pycapsule`; new `PhysicalExpr` wrapper class exported from the top-level package; `SessionContext` exposes the physical codec capsule + setter. Test coverage in python/tests/test_plans.py: round-trip via new API, deprecation warnings on old API, capsule protocol getters, session-routed codec on both layers. `PyLogicalPlan` PyCapsule protocol is intentionally not added — `datafusion-ffi` does not expose `FFI_LogicalPlan`, so there is no stable cross-crate shape to publish. Round-tripping a `LogicalPlan` goes through `to_bytes` / `from_bytes` only. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test: FFI-example integration tests for codec + plan capsule APIs Adds four downstream-crate fixtures in `datafusion-ffi-example` so the new PR1 surface can be tested with the same FFI-handoff pattern used for table providers, UDFs, etc. Existing tests prove the API exists; these tests prove it composes with code that lives in another crate. New Rust types in `examples/datafusion-ffi-example/src/`: * `MyLogicalExtensionCodec` — delegates to `DefaultLogicalExtensionCodec` and bumps atomic counters on the UDF encode/decode entry points. Exported via `__datafusion_logical_extension_codec__`. Installed onto a session with `ctx.with_logical_extension_codec(my_codec)`. * `MyPhysicalExtensionCodec` — mirror for `PhysicalExtensionCodec`. * `MyExecutionPlan` — wraps a one-column `EmptyExec`, exposes `__datafusion_execution_plan__`. Lets the receiver consume an `ExecutionPlan` capsule that did not originate in datafusion-python. * `MyPhysicalExpr` — wraps `Literal(Int32(42))`, exposes `__datafusion_physical_expr__`. Same FFI handoff for physical expressions. New tests: * `_test_logical_extension_codec.py` — codec installs cleanly, the session re-exports its capsule, and `try_encode_udf` fires on the user codec when serializing a plan that references a `ScalarUDF`. The decode counterpart is a round-trip check rather than a counter assertion: when the UDF is in the receiver's function registry, `parse_expr` resolves by name before consulting the codec. * `_test_physical_extension_codec.py` — symmetric. * `_test_execution_plan.py` — parametrized over typed-class vs raw-capsule input; verifies `ExecutionPlan.from_pycapsule` consumes the downstream capsule. * `_test_physical_expr.py` — same for `PhysicalExpr.from_pycapsule`. API changes forced by the new tests: * `PyLogicalPlan.to_bytes`, `PyExecutionPlan.to_bytes`, `PyPhysicalExpr.to_bytes` now accept an optional `ctx` parameter. When supplied, encoding routes through the session's installed codec instead of a fresh default. `ctx=None` preserves the previous default-codec behavior used by the deprecated `to_proto` shims. * The util `from_pycapsule!` / `try_from_pycapsule!` macros now validate the capsule name via `pointer_checked(Some(c"..."))` rather than `pointer_checked(None)`. The latter rejects named capsules outright with CPython's "incorrect name" error. * `SessionContext.with_logical_extension_codec` and `with_physical_extension_codec` now wrap the returned internal context in `SessionContext` so the result has the full Python surface. The pre-existing logical setter was returning a raw internal object that lacked `sql()` and friends. `examples/datafusion-ffi-example/Cargo.toml` gains `datafusion` and `datafusion-proto` workspace dependencies for the new Rust impls. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor: tighten PR1 scope to codec plumbing only Review feedback pass. PR1 is now strictly the composable codec layer + session routing + class-method serialization API. Anything that touches actual Python UDF inline encoding or Python expression wrapping moves to PR2 alongside the pickle work. Dropped: * `encode_python_scalar_udf` / `decode_python_scalar_udf` helpers from `crates/core/src/codec.rs`, along with cloudpickle and pyarrow imports. The wrapper codecs now delegate every method to `inner`. `DFPYUDF1` magic constant is kept (marked `dead_code` for now) as a reservation so PR2 has a single definition site. * `udf.rs` reverted to pre-PR1 shape. The codec no longer needs `func()` / `input_fields()` / `volatility()` / `from_parts()` accessors. Re-added by PR2 when scalar-UDF inlining lands. * `PyPhysicalExpr` class + Python wrapper + `__init__` export + `MyPhysicalExpr` FFI fixture + `_test_physical_expr.py`. No consumer in PR1 or PR2 plan documents; symmetry with `PyExecutionPlan` is not enough to justify the surface area. * Rust-side `PyLogicalPlan::to_proto` / `from_proto` and `PyExecutionPlan::to_proto` / `from_proto` deprecated wrappers. The deprecation lives entirely in the Python wrapper layer, which emits `DeprecationWarning` and forwards to `to_bytes` / `from_bytes`. Less Rust duplication. * `PythonLogicalCodec::with_default_inner` / `PythonPhysicalCodec::with_default_inner` — redundant with `impl Default`. Logic moved into `Default::default`. * `PySessionContext::default_logical_codec` / `default_physical_codec` helpers. Inlined as `Arc::new(PythonLogicalCodec::default())` at the three call sites. Tests (root: 1076, FFI example: 36) all green after the cuts. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * remove unuseful code comments * docs: rewrite codec module comments around purpose, not PR sequence The previous doc-block framed PythonLogicalCodec / PythonPhysicalCodec in terms of "PR1 delegates, PR2 will add encoding" — useful for review, useless for someone reading the code later. Reframed in terms of what the codecs exist to do: encode Python-side plan references (pure-Python UDFs, etc.) into the proto wire format so plans can cross process boundaries without the receiver having to pre-register every callable. The wrappers sit at the top of the session's codec stack and delegate non-Python encoding to a composable inner codec. Magic-prefix registry table loses the "reserved" column. Doc still notes that the in-module impls currently delegate and that encoder/decoder hooks land alongside the corresponding Python-side serialization work. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(codec): forward every LogicalExtensionCodec / PhysicalExtensionCodec method to inner PythonLogicalCodec previously only overrode the four required methods on the trait plus the scalar UDF pair, so the default trait impls (returning "LogicalExtensionCodec is not provided") shadowed any downstream FFI codec for file formats, aggregate UDFs, and window UDFs. A user installing their own codec via `SessionContext.with_logical_extension_codec(...)` would silently lose access to its `try_*_file_format`, `try_*_udaf`, `try_*_udwf` implementations. Forward every trait method to `inner` so the user-installed codec is fully reachable. Same change on the physical side, including `try_*_expr`, `try_*_udaf`, `try_*_udwf` — the corresponding Python-aware paths can layer on later by intercepting before delegation. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs: tighten codec dispatch test docstrings The previous docstrings claimed the tests verify "PythonLogicalCodec delegates non-Python UDFs to the inner codec." That's forward-looking — the codecs currently delegate every UDF unconditionally, so the test would behave identically for Python and non-Python UDFs. Rewrite to describe what the test actually proves: the dispatch chain `PyLogicalPlan.to_bytes -> session.logical_codec -> PythonLogicalCodec -> FFI -> user impl` (and the physical mirror) forwards correctly, observable via the user codec's atomic counter incrementing after one encode pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(ffi-example): MyExecutionPlan emits real data via MemorySourceConfig Was a one-column `EmptyExec` stub useful only as a capsule-handoff target. Promoted to a minimal reference impl that a downstream Rust crate can copy when exposing a custom `ExecutionPlan` to datafusion-python: configurable `num_rows`, produces a single batch of sequential `Int32` values under column `value`, wrapped in `DataSourceExec` via `MemorySourceConfig::try_new_exec`. Header comment explains the typical use case (remote backend, streaming source, synthetic data generator) and the `__datafusion_execution_plan__` capsule shape downstream crates should follow. Test asserts the schema-bearing plan survives the FFI hop: a `DataSourceExec` arrives with the expected partitioning and no children. Schema details are not surfaced through the FFI display path (only the wrapping `ForeignExecutionPlan` name + inner plan name appear), so the test does not assert the column name. `to_bytes` round-trip of an FFI-imported plan is not exercised: encoding requires a physical codec that knows how to serialize `ForeignExecutionPlan`, which the default codec does not. A downstream user round-tripping such a plan must install their own codec via `with_physical_extension_codec`. Documented in the test file rather than asserted on. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor: drop dormant ExecutionPlan PyCapsule round-trip `PyExecutionPlan::from_pycapsule` and the matching `__datafusion_execution_plan__` exporter have no consumer in this repo, on the POC `poc_ffi_query_planner` branch, or on any sibling branch (`testing/datafusion-distributed`, `testing/ffi-library-marker`, `tmp/ffi-with-codecs`). The pair was wired up speculatively for FFI plan handoff that no Python code path actually performs today. Drop the whole capsule round-trip for `ExecutionPlan`: * Rust `PyExecutionPlan::from_pycapsule` and `__datafusion_execution_plan__`. * Python `ExecutionPlan.from_pycapsule` and `__datafusion_execution_plan__` wrappers. * `MyExecutionPlan` FFI fixture + `_test_execution_plan.py` + lib.rs registration. Was solely a test fixture for the dropped path. * `test_execution_plan_pycapsule_protocol` in `python/tests/test_plans.py`. `PyExecutionPlan.to_bytes` / `from_bytes` survive — they encode through the session's physical codec and have real coverage. Capsule round-trip can be re-added when a concrete consumer (distributed worker, bridge library) lands. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat: PyExpr.to_bytes / from_bytes via session logical codec Mirrors PyLogicalPlan / PyExecutionPlan: encode through the session's installed `LogicalExtensionCodec` (or a default-inner `PythonLogicalCodec` when no `ctx` is supplied), decode against the session's function registry + codec via `parse_expr`. Rust side calls `datafusion_proto::logical_plan::to_proto::serialize_expr` and `from_proto::parse_expr`. Python wrapper threads an optional `SessionContext` through. Tests cover the session-routed roundtrip and the no-ctx default-codec encode path. Adds a third consumer of `session.logical_codec()` alongside `PyLogicalPlan` and the codec dispatch tests in the FFI example, broadening coverage of the codec stack. This is the last piece of the PR1 codec surface — follow-up pickle work (`Expr.__reduce__`, worker-scoped context, multiprocessing) can build on this without bundling the byte-level serialization API. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * test(ffi-example): assert codec roundtrip restores plan output PR review feedback: weak `is not None` checks let regressions slip past. Mirror python/tests/test_plans.py — logical compares `df.collect() == round_trip.collect()`; physical compares `str(original) == str(restored)`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This is a Python library that binds to Apache Arrow in-memory query engine DataFusion.
DataFusion's Python bindings can be used as a foundation for building new data systems in Python. Here are some examples:
For tips on tuning parallelism, see Maximizing CPU Usage in the configuration guide.
The following example demonstrates running a SQL query against a Parquet file using DataFusion, storing the results in a Pandas DataFrame, and then plotting a chart.
The Parquet file used in this example can be downloaded from the following page:
from datafusion import SessionContext # Create a DataFusion context ctx = SessionContext() # Register table with context ctx.register_parquet('taxi', 'yellow_tripdata_2021-01.parquet') # Execute SQL df = ctx.sql("select passenger_count, count(*) " "from taxi " "where passenger_count is not null " "group by passenger_count " "order by passenger_count") # convert to Pandas pandas_df = df.to_pandas() # create a chart fig = pandas_df.plot(kind="bar", title="Trip Count by Number of Passengers").get_figure() fig.savefig('chart.png')
This produces the following chart:
You can use SessionContext's register_view method to convert a DataFrame into a view and register it with the context.
from datafusion import SessionContext, col, literal # Create a DataFusion context ctx = SessionContext() # Create sample data data = {"a": [1, 2, 3, 4, 5], "b": [10, 20, 30, 40, 50]} # Create a DataFrame from the dictionary df = ctx.from_pydict(data, "my_table") # Filter the DataFrame (for example, keep rows where a > 2) df_filtered = df.filter(col("a") > literal(2)) # Register the dataframe as a view with the context ctx.register_view("view1", df_filtered) # Now run a SQL query against the registered view df_view = ctx.sql("SELECT * FROM view1") # Collect the results results = df_view.collect() # Convert results to a list of dictionaries for display result_dicts = [batch.to_pydict() for batch in results] print(result_dicts)
This will output:
[{'a': [3, 4, 5], 'b': [30, 40, 50]}]
It is possible to configure runtime (memory and disk settings) and configuration settings when creating a context.
runtime = ( RuntimeEnvBuilder() .with_disk_manager_os() .with_fair_spill_pool(10000000) ) config = ( SessionConfig() .with_create_default_catalog_and_schema(True) .with_default_catalog_and_schema("foo", "bar") .with_target_partitions(8) .with_information_schema(True) .with_repartition_joins(False) .with_repartition_aggregations(False) .with_repartition_windows(False) .with_parquet_pruning(False) .set("datafusion.execution.parquet.pushdown_filters", "true") ) ctx = SessionContext(config, runtime)
Refer to the API documentation for more information.
Printing the context will show the current configuration settings.
print(ctx)
For information about how to extend DataFusion Python, please see the extensions page of the online documentation.
See examples for more information.
uv add datafusion
pip install datafusion # or python -m pip install datafusion
conda install -c conda-forge datafusion
You can verify the installation by running:
>>> import datafusion >>> datafusion.__version__ '0.6.0'
This project ships a SKILL.md that teaches AI coding assistants how to write idiomatic DataFusion Python. It follows the Agent Skills open standard.
Preferred: npx skills add apache/datafusion-python — installs the skill in Claude Code, Cursor, Windsurf, Cline, Codex, Copilot, Gemini CLI, and other supported agents.
Manual: paste this line into your project's AGENTS.md / CLAUDE.md:
For DataFusion Python code, see https://github.com/apache/datafusion-python/blob/main/skills/datafusion_python/SKILL.md
This assumes that you have rust and cargo installed. We use the workflow recommended by pyo3 and maturin. The Maturin tools used in this workflow can be installed either via uv or pip. Both approaches should offer the same experience. It is recommended to use uv since it has significant performance improvements over pip.
Currently for protobuf support either protobuf or cmake must be installed.
Bootstrap (uv):
By default uv will attempt to build the datafusion python package. For our development we prefer to build manually. This means that when creating your virtual environment using uv sync you need to pass in the additional --no-install-package datafusion and for uv run commands the additional parameter --no-project
# fetch this repo git clone git@github.com:apache/datafusion-python.git # cd to the repo root cd datafusion-python/ # create the virtual environment uv sync --dev --no-install-package datafusion # activate the environment source .venv/bin/activate
Bootstrap (pip):
# fetch this repo git clone git@github.com:apache/datafusion-python.git # cd to the repo root cd datafusion-python/ # prepare development environment (used to build wheel / install in development) python3 -m venv .venv # activate the venv source .venv/bin/activate # update pip itself if necessary python -m pip install -U pip # install dependencies python -m pip install -r pyproject.toml
The tests rely on test data in git submodules.
git submodule update --init
Whenever rust code changes (your changes or via git pull):
# make sure you activate the venv using "source venv/bin/activate" first maturin develop --uv python -m pytest
Alternatively if you are using uv you can do the following without needing to activate the virtual environment:
uv run --no-project maturin develop --uv uv run --no-project pytest
To run the FFI tests within the examples folder, after you have built datafusion-python with the previous commands:
cd examples/datafusion-ffi-example uv run --no-project maturin develop --uv uv run --no-project pytest python/tests/_test_*py
datafusion-python takes advantage of pre-commit to assist developers with code linting to help reduce the number of commits that ultimately fail in CI due to linter errors. Using the pre-commit hooks is optional for the developer but certainly helpful for keeping PRs clean and concise.
Our pre-commit hooks can be installed by running pre-commit install, which will install the configurations in your DATAFUSION_PYTHON_ROOT/.github directory and run each time you perform a commit, failing to complete the commit if an offending lint is found allowing you to make changes locally before pushing.
The pre-commit hooks can also be run adhoc without installing them by simply running pre-commit run --all-files.
NOTE: the current pre-commit hooks require docker, and cmake. See note on protobuf above.
There are scripts in ci/scripts for running Rust and Python linters.
./ci/scripts/python_lint.sh ./ci/scripts/rust_clippy.sh ./ci/scripts/rust_fmt.sh ./ci/scripts/rust_toml_fmt.sh
This project includes an AI agent skill for auditing which features from the upstream Apache DataFusion Rust library are not yet exposed in these Python bindings. This is useful when adding missing functions, auditing API coverage, or ensuring parity with upstream.
The skill accepts an optional area argument:
scalar functions aggregate functions window functions dataframe session context ffi types all
If no argument is provided, it defaults to checking all areas. The skill will fetch the upstream DataFusion documentation, compare it against the functions and methods exposed in this project, and produce a coverage report listing what is currently exposed and what is missing.
The skill definition lives in .ai/skills/check-upstream/SKILL.md and follows the Agent Skills open standard. It can be used by any AI coding agent that supports skill discovery, or followed manually.
To change test dependencies, change the pyproject.toml and run
uv sync --dev --no-install-package datafusion