blob: ae45c98d74832d6791d28c65c947445e20326342 [file] [log] [blame] [view]
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Example Usage
In this example some simple processing is performed on the [`example.csv`](https://github.com/apache/datafusion/blob/main/datafusion/core/tests/data/example.csv) file.
Even [`more code examples`](https://github.com/apache/datafusion/tree/main/datafusion-examples) attached to the project.
## Add published DataFusion dependency
Find latest available Datafusion version on [DataFusion's
crates.io] page. Add the dependency to your `Cargo.toml` file:
```toml
datafusion = "latest_version"
tokio = "1.0"
```
## Add latest non published DataFusion dependency
DataFusion changes are published to `crates.io` according to [release schedule](https://github.com/apache/datafusion/blob/main/dev/release/README.md#release-process)
In case if it is required to test out DataFusion changes which are merged but yet to be published, Cargo supports adding dependency directly to GitHub branch
```toml
datafusion = { git = "https://github.com/apache/datafusion", branch = "main"}
```
Also it works on the package level
```toml
datafusion-common = { git = "https://github.com/apache/datafusion", branch = "main", package = "datafusion-common"}
```
And with features
```toml
datafusion = { git = "https://github.com/apache/datafusion", branch = "main", default-features = false, features = ["unicode_expressions"] }
```
More on [Cargo dependencies](https://doc.rust-lang.org/cargo/reference/specifying-dependencies.html#specifying-dependencies)
## Run a SQL query against data stored in a CSV:
```rust
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// register the table
let ctx = SessionContext::new();
ctx.register_csv("example", "tests/data/example.csv", CsvReadOptions::new()).await?;
// create a plan to run a SQL query
let df = ctx.sql("SELECT a, MIN(b) FROM example WHERE a <= b GROUP BY a LIMIT 100").await?;
// execute and print results
df.show().await?;
Ok(())
}
```
## Use the DataFrame API to process data stored in a CSV:
```rust
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// create the dataframe
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/example.csv", CsvReadOptions::new()).await?;
let df = df.filter(col("a").lt_eq(col("b")))?
.aggregate(vec![col("a")], vec![min(col("b"))])?
.limit(0, Some(100))?;
// execute and print results
df.show().await?;
Ok(())
}
```
## Output from both examples
```text
+---+--------+
| a | MIN(b) |
+---+--------+
| 1 | 2 |
+---+--------+
```
## Arrow Versions
Many of DataFusion's public APIs use types from the
[`arrow`] and [`parquet`] crates, so if you use
`arrow` in your project, the `arrow` version must match that used by
DataFusion. You can check the required version on [DataFusion's
crates.io] page.
The easiest way to ensure the versions match is to use the `arrow`
exported by DataFusion, for example:
```rust
use datafusion::arrow::datatypes::Schema;
```
For example, [DataFusion `25.0.0` dependencies] require `arrow`
`39.0.0`. If instead you used `arrow` `40.0.0` in your project you may
see errors such as:
```text
mismatched types [E0308] expected `Schema`, found `arrow_schema::Schema` Note: `arrow_schema::Schema` and `Schema` have similar names, but are actually distinct types Note: `arrow_schema::Schema` is defined in crate `arrow_schema` Note: `Schema` is defined in crate `arrow_schema` Note: perhaps two different versions of crate `arrow_schema` are being used? Note: associated function defined here
```
Or calling `downcast_ref` on an `ArrayRef` may return `None`
unexpectedly.
[`arrow`]: https://docs.rs/arrow/latest/arrow/
[`parquet`]: https://docs.rs/parquet/latest/parquet/
[datafusion's crates.io]: https://crates.io/crates/datafusion
[datafusion `26.0.0` dependencies]: https://crates.io/crates/datafusion/26.0.0/dependencies
## Identifiers and Capitalization
Please be aware that all identifiers are effectively made lower-case in SQL, so if your csv file has capital letters (ex: `Name`) you must put your column name in double quotes or the examples won't work.
To illustrate this behavior, consider the [`capitalized_example.csv`](../../../datafusion/core/tests/data/capitalized_example.csv) file:
## Run a SQL query against data stored in a CSV:
```rust
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// register the table
let ctx = SessionContext::new();
ctx.register_csv("example", "tests/data/capitalized_example.csv", CsvReadOptions::new()).await?;
// create a plan to run a SQL query
let df = ctx.sql("SELECT \"A\", MIN(b) FROM example WHERE \"A\" <= c GROUP BY \"A\" LIMIT 100").await?;
// execute and print results
df.show().await?;
Ok(())
}
```
## Use the DataFrame API to process data stored in a CSV:
```rust
use datafusion::prelude::*;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
// create the dataframe
let ctx = SessionContext::new();
let df = ctx.read_csv("tests/data/capitalized_example.csv", CsvReadOptions::new()).await?;
let df = df
// col will parse the input string, hence requiring double quotes to maintain the capitalization
.filter(col("\"A\"").lt_eq(col("c")))?
// alternatively use ident to pass in an unqualified column name directly without parsing
.aggregate(vec![ident("A")], vec![min(col("b"))])?
.limit(0, Some(100))?;
// execute and print results
df.show().await?;
Ok(())
}
```
## Output from both examples
```text
+---+--------+
| A | MIN(b) |
+---+--------+
| 2 | 1 |
| 1 | 2 |
+---+--------+
```
## Extensibility
DataFusion is designed to be extensible at all points. To that end, you can provide your own custom:
- [x] User Defined Functions (UDFs)
- [x] User Defined Aggregate Functions (UDAFs)
- [x] User Defined Table Source (`TableProvider`) for tables
- [x] User Defined `Optimizer` passes (plan rewrites)
- [x] User Defined `LogicalPlan` nodes
- [x] User Defined `ExecutionPlan` nodes
## Optimized Configuration
For an optimized build several steps are required. First, use the below in your `Cargo.toml`. It is
worth noting that using the settings in the `[profile.release]` section will significantly increase the build time.
```toml
[dependencies]
datafusion = { version = "22.0" }
tokio = { version = "^1.0", features = ["rt-multi-thread"] }
snmalloc-rs = "0.3"
[profile.release]
lto = true
codegen-units = 1
```
Then, in `main.rs.` update the memory allocator with the below after your imports:
```rust ,ignore
use datafusion::prelude::*;
#[global_allocator]
static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
#[tokio::main]
async fn main() -> datafusion::error::Result<()> {
Ok(())
}
```
Based on the instruction set architecture you are building on you will want to configure the `target-cpu` as well, ideally
with `native` or at least `avx2`.
```shell
RUSTFLAGS='-C target-cpu=native' cargo run --release
```
## Enable backtraces
By default Datafusion returns errors as a plain message. There is option to enable more verbose details about the error,
like error backtrace. To enable a backtrace you need to add Datafusion `backtrace` feature to your `Cargo.toml` file:
```toml
datafusion = { version = "31.0.0", features = ["backtrace"]}
```
Set environment [variables](https://doc.rust-lang.org/std/backtrace/index.html#environment-variables)
```bash
RUST_BACKTRACE=1 ./target/debug/datafusion-cli
DataFusion CLI v31.0.0
> select row_numer() over (partition by a order by a) from (select 1 a);
Error during planning: Invalid function 'row_numer'.
Did you mean 'ROW_NUMBER'?
backtrace: 0: std::backtrace_rs::backtrace::libunwind::trace
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/../../backtrace/src/backtrace/libunwind.rs:93:5
1: std::backtrace_rs::backtrace::trace_unsynchronized
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/../../backtrace/src/backtrace/mod.rs:66:5
2: std::backtrace::Backtrace::create
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/backtrace.rs:332:13
3: std::backtrace::Backtrace::capture
at /rustc/5680fa18feaa87f3ff04063800aec256c3d4b4be/library/std/src/backtrace.rs:298:9
4: datafusion_common::error::DataFusionError::get_back_trace
at /datafusion/datafusion/common/src/error.rs:436:30
5: datafusion_sql::expr::function::<impl datafusion_sql::planner::SqlToRel<S>>::sql_function_to_expr
............
```
The backtraces are useful when debugging code. If there is a test in `datafusion/core/src/physical_planner.rs`
```
#[tokio::test]
async fn test_get_backtrace_for_failed_code() -> Result<()> {
let ctx = SessionContext::new();
let sql = "
select row_numer() over (partition by a order by a) from (select 1 a);
";
let _ = ctx.sql(sql).await?.collect().await?;
Ok(())
}
```
To obtain a backtrace:
```bash
cargo build --features=backtrace
RUST_BACKTRACE=1 cargo test --features=backtrace --package datafusion --lib -- physical_planner::tests::test_get_backtrace_for_failed_code --exact --nocapture
```
Note: The backtrace wrapped into systems calls, so some steps on top of the backtrace can be ignored