commit | 3a62721bc9f677dac815db50cb5d958fa34a3afa | [log] [tgz] |
---|---|---|
author | Francis Du <francis@francis.run> | Thu Sep 08 21:33:45 2022 +0800 |
committer | GitHub <noreply@github.com> | Thu Sep 08 07:33:45 2022 -0600 |
tree | fc010c106f4183d8b860f981a72e4af8333de31c | |
parent | b390d08641c0ee64a3042b9401349a1f569876c2 [diff] |
[SessionContext] - Add table_exist funcation for session context (#48) * feat: add table_exist funcation for dataframe * fix: flake E712
This is a Python library that binds to Apache Arrow in-memory query engine DataFusion.
Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV files, run it in a multi-threaded environment, and obtain the result back in Python.
It also allows you to use UDFs and UDAFs for complex operations.
The major advantage of this library over other execution engines is that this library achieves zero-copy between Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart from having to lock the GIL when running those operations.
Its query engine, DataFusion, is written in Rust, which makes strong assumptions about thread safety and lack of memory leaks.
Technically, zero-copy is achieved via the c data interface.
Simple usage:
import datafusion from datafusion import functions as f from datafusion import col import pyarrow # create a context ctx = datafusion.SessionContext() # create a RecordBatch and a new DataFrame from it batch = pyarrow.RecordBatch.from_arrays( [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])], names=["a", "b"], ) df = ctx.create_dataframe([[batch]]) # create a new statement df = df.select( col("a") + col("b"), col("a") - col("b"), ) # execute and collect the first (and only) batch result = df.collect()[0] assert result.column(0) == pyarrow.array([5, 7, 9]) assert result.column(1) == pyarrow.array([-3, -3, -3])
from datafusion import udf def is_null(array: pyarrow.Array) -> pyarrow.Array: return array.is_null() is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable') df = df.select(is_null_arr(col("a"))) result = df.collect() assert result.column(0) == pyarrow.array([False] * 3)
import pyarrow import pyarrow.compute from datafusion import udaf, Accumulator class MyAccumulator(Accumulator): """ Interface of a user-defined accumulation. """ def __init__(self): self._sum = pyarrow.scalar(0.0) def update(self, values: pyarrow.Array) -> None: # not nice since pyarrow scalars can't be summed yet. This breaks on `None` self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py()) def merge(self, states: pyarrow.Array) -> None: # not nice since pyarrow scalars can't be summed yet. This breaks on `None` self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py()) def state(self) -> pyarrow.Array: return pyarrow.array([self._sum.as_py()]) def evaluate(self) -> pyarrow.Scalar: return self._sum df = ctx.create_dataframe([[batch]]) my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable') df = df.aggregate( [], [my_udaf(col("a"))] ) result = df.collect()[0] assert result.column(0) == pyarrow.array([6.0])
pip install datafusion # or python -m pip install datafusion
You can verify the installation by running:
>>> import datafusion >>> datafusion.__version__ '0.6.0'
This assumes that you have rust and cargo installed. We use the workflow recommended by pyo3 and maturin.
Bootstrap:
# fetch this repo git clone git@github.com:apache/arrow-datafusion-python.git # prepare development environment (used to build wheel / install in development) python3 -m venv venv # activate the venv source venv/bin/activate # update pip itself if necessary python -m pip install -U pip # install dependencies (for Python 3.8+) python -m pip install -r requirements-310.txt
Whenever rust code changes (your changes or via git pull
):
# make sure you activate the venv using "source venv/bin/activate" first maturin develop python -m pytest
To change test dependencies, change the requirements.in
and run
# install pip-tools (this can be done only once), also consider running in venv python -m pip install pip-tools python -m piptools compile --generate-hashes -o requirements-310.txt
To update dependencies, run with -U
python -m piptools compile -U --generate-hashes -o requirements-310.txt
More details here