You'll need to have both rust and cargo installed.
We will follow the development workflow outlined by datafusion-python, pyo3 and maturin.
The Maturin tools used in this workflow can be installed either via uv or pip. Both approaches should offer the same experience. It is recommended to use uv since it has significant performance improvements over pip.
Bootstrap (uv):
By default uv will attempt to build the datafusion-ray python package. For our development we prefer to build manually. This means that when creating your virtual environment using uv sync you need to pass in the additional --no-install-package datafusion-ray. This tells uv, to install all of the dependencies found in pyproject.toml, but skip building datafusion-ray as we'll do that manually.
# fetch this repo git clone git@github.com:apache/datafusion-ray.git # go to repo root cd datafusion-ray # create the virtual enviornment uv sync --dev --no-install-package datafusion-ray # activate the environment source .venv/bin/activate
Bootstrap (pip):
# fetch this repo git clone git@github.com:apache/datafusion-python.git # go to repo root cd datafusion-ray # prepare development environment (used to build wheel / install in development) python3 -m venv .venv # activate the venv source .venv/bin/activate # update pip itself if necessary python -m pip install -U pip # install dependencies python -m pip install -r pyproject.toml
Whenever rust code changes (your changes or via git pull):
# make sure you activate the venv using "source venv/bin/activate" first maturin develop --uv python -m pytest
examples directory, runRAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tips.py --data-dir=$(pwd)/../testdata/tips/
tpch directory, use make_data.py to create a TPCH dataset at a provided scale factor, thenRAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --qnum 2
To execute the TPCH query #2. To execute an arbitrary query against the TPCH dataset, provide it with --query instead of --qnum. This is useful for validating plans that DataFusion for Ray will create.
For example, to execute the following query:
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --query 'select c.c_name, sum(o.o_totalprice) as total from orders o inner join customer c on o.o_custkey = c.c_custkey group by c_name limit 1'
To further parallelize execution, you can choose how many partitions will be served by each Stage with --partitions-per-processor. If this number is less than --concurrency Then multiple Actors will host portions of the stage. For example, if there are 10 stages calculated for a query, concurrency=16 and partitions-per-processor=4, then 40 RayStage Actors will be created. If partitions-per-processor=16 or is absent, then 10 RayStage Actors will be created.
To validate the output against non-ray single node datafusion, add --validate which will ensure that both systems produce the same output.
To run the entire TPCH benchmark use
RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 [--partitions-per-processor=] [--validate]
This will output a json file in the current directory with query timings.
DataFusion for Ray's logging output is determined by the DATAFUSION_RAY_LOG_LEVEL environment variable. The default log level is WARN. To change the log level, set the environment variable to one of the following values: ERROR, WARN, INFO, DEBUG, or TRACE.
DataFusion for Ray outputs logs from both python and rust, and in order to handle this consistently, the python logger for datafusion_ray is routed to rust for logging. The RUST_LOG environment variable can be used to control other rust log output other than datafusion_ray.
table_parquet_options.pushdown_filters=true after deserialization to compensate. This will be refactored in the future.