blob: e7fce1921e7a8a38f88cfe7af56742484fca5175 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Benchmark for sort pushdown optimization.
//!
//! Tests performance of sort elimination when files are non-overlapping and
//! internally sorted (declared via `--sorted` / `WITH ORDER`).
//!
//! Queries are loaded from external SQL files under `queries/sort_pushdown/`
//! so they can also be run directly with `datafusion-cli`.
//!
//! # Usage
//!
//! ```text
//! # Prepare sorted TPCH lineitem data (SF=1)
//! ./bench.sh data sort_pushdown
//!
//! # Baseline (no WITH ORDER, full SortExec)
//! ./bench.sh run sort_pushdown
//!
//! # With sort elimination (WITH ORDER, SortExec removed)
//! ./bench.sh run sort_pushdown_sorted
//! ```
use clap::Args;
use futures::StreamExt;
use std::path::PathBuf;
use std::sync::Arc;
use datafusion::datasource::TableProvider;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::error::Result;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion::prelude::*;
use datafusion_common::DEFAULT_PARQUET_EXTENSION;
use datafusion_common::instant::Instant;
use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats};
/// Default path to query files, relative to the benchmark root
const SORT_PUSHDOWN_QUERY_DIR: &str = "queries/sort_pushdown";
#[derive(Debug, Args)]
pub struct RunOpt {
/// Common options
#[command(flatten)]
common: CommonOpt,
/// Sort pushdown query number (1-4). If not specified, runs all queries
#[arg(short, long)]
pub query: Option<usize>,
/// Path to data files (lineitem). Only parquet format is supported.
#[arg(required = true, short = 'p', long = "path")]
path: PathBuf,
/// Path to JSON benchmark result to be compared using `compare.py`
#[arg(short = 'o', long = "output")]
output_path: Option<PathBuf>,
/// Path to directory containing query SQL files (q1.sql, q2.sql, ...).
/// Defaults to `queries/sort_pushdown/` relative to current directory.
#[arg(long = "queries-path")]
queries_path: Option<PathBuf>,
/// Mark the first column (l_orderkey) as sorted via WITH ORDER.
/// When set, enables sort elimination for matching queries.
#[arg(short = 't', long = "sorted")]
sorted: bool,
}
impl RunOpt {
const TABLES: [&'static str; 1] = ["lineitem"];
fn queries_dir(&self) -> PathBuf {
self.queries_path
.clone()
.unwrap_or_else(|| PathBuf::from(SORT_PUSHDOWN_QUERY_DIR))
}
fn load_query(&self, query_id: usize) -> Result<String> {
let path = self.queries_dir().join(format!("q{query_id}.sql"));
std::fs::read_to_string(&path).map_err(|e| {
datafusion_common::DataFusionError::Execution(format!(
"Failed to read query file {}: {e}",
path.display()
))
})
}
fn available_queries(&self) -> Vec<usize> {
let dir = self.queries_dir();
let mut ids = Vec::new();
if let Ok(entries) = std::fs::read_dir(&dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let name = name.to_string_lossy();
if let Some(rest) = name.strip_prefix('q')
&& let Some(num_str) = rest.strip_suffix(".sql")
&& let Ok(id) = num_str.parse::<usize>()
{
ids.push(id);
}
}
}
ids.sort();
ids
}
pub async fn run(&self) -> Result<()> {
let mut benchmark_run = BenchmarkRun::new();
let query_ids = match self.query {
Some(query_id) => vec![query_id],
None => self.available_queries(),
};
for query_id in query_ids {
benchmark_run.start_new_case(&format!("{query_id}"));
let query_results = self.benchmark_query(query_id).await;
match query_results {
Ok(query_results) => {
for iter in query_results {
benchmark_run.write_iter(iter.elapsed, iter.row_count);
}
}
Err(e) => {
benchmark_run.mark_failed();
eprintln!("Query {query_id} failed: {e}");
}
}
}
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
benchmark_run.maybe_print_failures();
Ok(())
}
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let sql = self.load_query(query_id)?;
let config = self.common.config()?;
let rt = self.common.build_runtime()?;
let state = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(rt)
.with_default_features()
.build();
let ctx = SessionContext::from(state);
self.register_tables(&ctx).await?;
let mut millis = vec![];
let mut query_results = vec![];
for i in 0..self.iterations() {
let start = Instant::now();
let row_count = self.execute_query(&ctx, sql.as_str()).await?;
let elapsed = start.elapsed();
let ms = elapsed.as_secs_f64() * 1000.0;
millis.push(ms);
println!(
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
);
query_results.push(QueryResult { elapsed, row_count });
}
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {query_id} avg time: {avg:.2} ms");
print_memory_stats();
Ok(query_results)
}
async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
for table in Self::TABLES {
let table_provider = self.get_table(ctx, table).await?;
ctx.register_table(table, table_provider)?;
}
Ok(())
}
async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> Result<usize> {
let debug = self.common.debug;
let plan = ctx.sql(sql).await?;
let (state, plan) = plan.into_parts();
if debug {
println!("=== Logical plan ===\n{plan}\n");
}
let plan = state.optimize(&plan)?;
if debug {
println!("=== Optimized logical plan ===\n{plan}\n");
}
let physical_plan = state.create_physical_plan(&plan).await?;
if debug {
println!(
"=== Physical plan ===\n{}\n",
displayable(physical_plan.as_ref()).indent(true)
);
}
let mut row_count = 0;
let mut stream = execute_stream(physical_plan.clone(), state.task_ctx())?;
while let Some(batch) = stream.next().await {
row_count += batch?.num_rows();
}
if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
.indent(true)
);
}
Ok(row_count)
}
async fn get_table(
&self,
ctx: &SessionContext,
table: &str,
) -> Result<Arc<dyn TableProvider>> {
let path = self.path.to_str().unwrap();
let state = ctx.state();
let path = format!("{path}/{table}");
let format = Arc::new(
ParquetFormat::default()
.with_options(ctx.state().table_options().parquet.clone()),
);
let extension = DEFAULT_PARQUET_EXTENSION;
let options = ListingOptions::new(format)
.with_file_extension(extension)
.with_collect_stat(true); // Always collect statistics for sort pushdown
let table_path = ListingTableUrl::parse(path)?;
let schema = options.infer_schema(&state, &table_path).await?;
let options = if self.sorted {
let key_column_name = schema.fields()[0].name();
options
.with_file_sort_order(vec![vec![col(key_column_name).sort(true, false)]])
} else {
options
};
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(schema);
Ok(Arc::new(ListingTable::try_new(config)?))
}
fn iterations(&self) -> usize {
self.common.iterations
}
}