| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::fs; |
| use std::io::ErrorKind; |
| use std::path::{Path, PathBuf}; |
| |
| use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats}; |
| use clap::Args; |
| use datafusion::logical_expr::{ExplainFormat, ExplainOption}; |
| use datafusion::{ |
| error::{DataFusionError, Result}, |
| prelude::SessionContext, |
| }; |
| use datafusion_common::exec_datafusion_err; |
| use datafusion_common::instant::Instant; |
| |
| /// SQL to create the hits view with proper EventDate casting. |
| /// |
| /// ClickBench stores EventDate as UInt16 (days since 1970-01-01) for |
| /// storage efficiency (2 bytes vs 4-8 bytes for date types). |
| /// This view transforms it to SQL DATE type for query compatibility. |
| const HITS_VIEW_DDL: &str = r#"CREATE VIEW hits AS |
| SELECT * EXCEPT ("EventDate"), |
| CAST(CAST("EventDate" AS INTEGER) AS DATE) AS "EventDate" |
| FROM hits_raw"#; |
| |
| /// Driver program to run the ClickBench benchmark |
| /// |
| /// The ClickBench[1] benchmarks are widely cited in the industry and |
| /// focus on grouping / aggregation / filtering. This runner uses the |
| /// scripts and queries from [2]. |
| /// |
| /// [1]: https://github.com/ClickHouse/ClickBench |
| /// [2]: https://github.com/ClickHouse/ClickBench/tree/main/datafusion |
| #[derive(Debug, Args, Clone)] |
| #[command(verbatim_doc_comment)] |
| pub struct RunOpt { |
| /// Query number (between 0 and 42). If not specified, runs all queries |
| #[arg(short, long)] |
| pub query: Option<usize>, |
| |
| /// If specified, enables Parquet Filter Pushdown. |
| /// |
| /// Specifically, it enables: |
| /// * `pushdown_filters = true` |
| /// * `reorder_filters = true` |
| #[arg(long = "pushdown")] |
| pushdown: bool, |
| |
| /// Common options |
| #[command(flatten)] |
| common: CommonOpt, |
| |
| /// Path to hits.parquet (single file) or `hits_partitioned` |
| /// (partitioned, 100 files) |
| #[arg( |
| short = 'p', |
| long = "path", |
| default_value = "benchmarks/data/hits.parquet" |
| )] |
| path: PathBuf, |
| |
| /// Path to queries directory |
| #[arg( |
| short = 'r', |
| long = "queries-path", |
| default_value = "benchmarks/queries/clickbench/queries" |
| )] |
| pub queries_path: PathBuf, |
| |
| /// If present, write results json here |
| #[arg(short = 'o', long = "output")] |
| output_path: Option<PathBuf>, |
| |
| /// Column name that the data is sorted by (e.g., "EventTime") |
| /// If specified, DataFusion will be informed that the data has this sort order |
| /// using CREATE EXTERNAL TABLE with WITH ORDER clause. |
| /// |
| /// Recommended to use with: -c datafusion.optimizer.prefer_existing_sort=true |
| /// This allows DataFusion to optimize away redundant sorts while maintaining |
| /// multi-core parallelism for other operations. |
| #[arg(long = "sorted-by")] |
| sorted_by: Option<String>, |
| |
| /// Sort order: ASC or DESC (default: ASC) |
| #[arg(long = "sort-order", default_value = "ASC")] |
| sort_order: String, |
| |
| /// Configuration options in the format key=value |
| /// Can be specified multiple times. |
| /// |
| /// Example: -c datafusion.optimizer.prefer_existing_sort=true |
| #[arg(short = 'c', long = "config")] |
| config_options: Vec<String>, |
| } |
| |
| /// Get the SQL file path |
| pub fn get_query_path(query_dir: &Path, query: usize) -> PathBuf { |
| let mut query_path = query_dir.to_path_buf(); |
| query_path.push(format!("q{query}.sql")); |
| query_path |
| } |
| |
| /// Get the SQL statement from the specified query file |
| pub fn get_query_sql(query_path: &Path) -> Result<Option<String>> { |
| if fs::exists(query_path)? { |
| Ok(Some(fs::read_to_string(query_path)?)) |
| } else { |
| Ok(None) |
| } |
| } |
| |
| impl RunOpt { |
| pub async fn run(self) -> Result<()> { |
| println!("Running benchmarks with the following options: {self:?}"); |
| |
| let query_dir_metadata = fs::metadata(&self.queries_path).map_err(|e| { |
| if e.kind() == ErrorKind::NotFound { |
| exec_datafusion_err!( |
| "Query path '{}' does not exist.", |
| &self.queries_path.to_str().unwrap() |
| ) |
| } else { |
| DataFusionError::External(Box::new(e)) |
| } |
| })?; |
| |
| if !query_dir_metadata.is_dir() { |
| return Err(exec_datafusion_err!( |
| "Query path '{}' is not a directory.", |
| &self.queries_path.to_str().unwrap() |
| )); |
| } |
| |
| let query_range = match self.query { |
| Some(query_id) => query_id..=query_id, |
| None => 0..=usize::MAX, |
| }; |
| |
| // configure parquet options |
| let mut config = self.common.config()?; |
| |
| if self.sorted_by.is_some() { |
| println!("ℹ️ Data is registered with sort order"); |
| |
| let has_prefer_sort = self |
| .config_options |
| .iter() |
| .any(|opt| opt.contains("prefer_existing_sort=true")); |
| |
| if !has_prefer_sort { |
| println!( |
| "ℹ️ Consider using -c datafusion.optimizer.prefer_existing_sort=true" |
| ); |
| println!("ℹ️ to optimize queries while maintaining parallelism"); |
| } |
| } |
| |
| // Apply user-provided configuration options |
| for config_opt in &self.config_options { |
| let parts: Vec<&str> = config_opt.splitn(2, '=').collect(); |
| if parts.len() != 2 { |
| return Err(exec_datafusion_err!( |
| "Invalid config option format: '{}'. Expected 'key=value'", |
| config_opt |
| )); |
| } |
| let key = parts[0]; |
| let value = parts[1]; |
| |
| println!("Setting config: {key} = {value}"); |
| config = config.set_str(key, value); |
| } |
| |
| { |
| let parquet_options = &mut config.options_mut().execution.parquet; |
| // The hits_partitioned dataset specifies string columns |
| // as binary due to how it was written. Force it to strings |
| parquet_options.binary_as_string = true; |
| |
| // Turn on Parquet filter pushdown if requested |
| if self.pushdown { |
| parquet_options.pushdown_filters = true; |
| parquet_options.reorder_filters = true; |
| } |
| |
| if self.sorted_by.is_some() { |
| // We should compare the dynamic topk optimization when data is sorted, so we make the |
| // assumption that filter pushdown is also enabled in this case. |
| parquet_options.pushdown_filters = true; |
| parquet_options.reorder_filters = true; |
| } |
| } |
| |
| let rt_builder = self.common.runtime_env_builder()?; |
| let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); |
| |
| self.register_hits(&ctx).await?; |
| |
| let mut benchmark_run = BenchmarkRun::new(); |
| for query_id in query_range { |
| let query_path = get_query_path(&self.queries_path, query_id); |
| let Some(sql) = get_query_sql(&query_path)? else { |
| if self.query.is_some() { |
| return Err(exec_datafusion_err!( |
| "Could not load query file '{}'.", |
| &query_path.to_str().unwrap() |
| )); |
| } |
| break; |
| }; |
| benchmark_run.start_new_case(&format!("Query {query_id}")); |
| let query_run = self.benchmark_query(&sql, query_id, &ctx).await; |
| match query_run { |
| Ok(query_results) => { |
| for iter in query_results { |
| benchmark_run.write_iter(iter.elapsed, iter.row_count); |
| } |
| } |
| Err(e) => { |
| benchmark_run.mark_failed(); |
| eprintln!("Query {query_id} failed: {e}"); |
| } |
| } |
| } |
| benchmark_run.maybe_write_json(self.output_path.as_ref())?; |
| benchmark_run.maybe_print_failures(); |
| Ok(()) |
| } |
| |
| async fn benchmark_query( |
| &self, |
| sql: &str, |
| query_id: usize, |
| ctx: &SessionContext, |
| ) -> Result<Vec<QueryResult>> { |
| println!("Q{query_id}: {sql}"); |
| |
| let mut millis = Vec::with_capacity(self.iterations()); |
| let mut query_results = vec![]; |
| for i in 0..self.iterations() { |
| let start = Instant::now(); |
| let results = ctx.sql(sql).await?.collect().await?; |
| let elapsed = start.elapsed(); |
| let ms = elapsed.as_secs_f64() * 1000.0; |
| millis.push(ms); |
| let row_count: usize = results.iter().map(|b| b.num_rows()).sum(); |
| println!( |
| "Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows" |
| ); |
| query_results.push(QueryResult { elapsed, row_count }) |
| } |
| if self.common.debug { |
| ctx.sql(sql) |
| .await? |
| .explain_with_options( |
| ExplainOption::default().with_format(ExplainFormat::Tree), |
| )? |
| .show() |
| .await?; |
| } |
| let avg = millis.iter().sum::<f64>() / millis.len() as f64; |
| println!("Query {query_id} avg time: {avg:.2} ms"); |
| |
| // Print memory usage stats using mimalloc (only when compiled with --features mimalloc_extended) |
| print_memory_stats(); |
| |
| Ok(query_results) |
| } |
| |
| /// Registers the `hits.parquet` as a table named `hits` |
| /// If sorted_by is specified, uses CREATE EXTERNAL TABLE with WITH ORDER |
| async fn register_hits(&self, ctx: &SessionContext) -> Result<()> { |
| let path = self.path.as_os_str().to_str().unwrap(); |
| |
| // If sorted_by is specified, use CREATE EXTERNAL TABLE with WITH ORDER |
| if let Some(ref sort_column) = self.sorted_by { |
| println!( |
| "Registering table with sort order: {} {}", |
| sort_column, self.sort_order |
| ); |
| |
| // Escape column name with double quotes |
| let escaped_column = if sort_column.contains('"') { |
| sort_column.clone() |
| } else { |
| format!("\"{sort_column}\"") |
| }; |
| |
| // Build CREATE EXTERNAL TABLE DDL with WITH ORDER clause |
| // Schema will be automatically inferred from the Parquet file |
| let create_table_sql = format!( |
| "CREATE EXTERNAL TABLE hits_raw \ |
| STORED AS PARQUET \ |
| LOCATION '{}' \ |
| WITH ORDER ({} {})", |
| path, |
| escaped_column, |
| self.sort_order.to_uppercase() |
| ); |
| |
| println!("Executing: {create_table_sql}"); |
| |
| // Execute the CREATE EXTERNAL TABLE statement |
| ctx.sql(&create_table_sql).await?.collect().await?; |
| } else { |
| // Original registration without sort order |
| let options = Default::default(); |
| ctx.register_parquet("hits_raw", path, options) |
| .await |
| .map_err(|e| { |
| DataFusionError::Context( |
| format!("Registering 'hits_raw' as {path}"), |
| Box::new(e), |
| ) |
| })?; |
| } |
| |
| // Create the hits view with EventDate transformation |
| Self::create_hits_view(ctx).await |
| } |
| |
| /// Creates the hits view with EventDate transformation from UInt16 to DATE. |
| /// |
| /// ClickBench encodes EventDate as UInt16 days since epoch (1970-01-01). |
| async fn create_hits_view(ctx: &SessionContext) -> Result<()> { |
| ctx.sql(HITS_VIEW_DDL).await?.collect().await.map_err(|e| { |
| DataFusionError::Context( |
| "Creating 'hits' view with EventDate transformation".to_string(), |
| Box::new(e), |
| ) |
| })?; |
| Ok(()) |
| } |
| |
| fn iterations(&self) -> usize { |
| self.common.iterations |
| } |
| } |