blob: cc59b7803036082d26fd52c5846629d10f4dbba4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::path::PathBuf;
use std::sync::Arc;
use super::{
get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_QUERY_END_ID,
TPCH_QUERY_START_ID, TPCH_TABLES,
};
use crate::util::{print_memory_stats, BenchmarkRun, CommonOpt, QueryResult};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty::{self, pretty_format_batches};
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::file_format::FileFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use datafusion_common::utils::get_available_parallelism;
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};
use log::info;
use structopt::StructOpt;
// hack to avoid `default_value is meaningless for bool` errors
type BoolDefaultTrue = bool;
/// Run the tpch benchmark.
///
/// This benchmarks is derived from the [TPC-H][1] version
/// [2.17.1]. The data and answers are generated using `tpch-gen` from
/// [2].
///
/// [1]: http://www.tpc.org/tpch/
/// [2]: https://github.com/databricks/tpch-dbgen.git
/// [2.17.1]: https://www.tpc.org/tpc_documents_current_versions/pdf/tpc-h_v2.17.1.pdf
#[derive(Debug, StructOpt, Clone)]
#[structopt(verbatim_doc_comment)]
pub struct RunOpt {
/// Query number. If not specified, runs all queries
#[structopt(short, long)]
pub query: Option<usize>,
/// Common options
#[structopt(flatten)]
common: CommonOpt,
/// Path to data files
#[structopt(parse(from_os_str), required = true, short = "p", long = "path")]
path: PathBuf,
/// File format: `csv` or `parquet`
#[structopt(short = "f", long = "format", default_value = "csv")]
file_format: String,
/// Load the data into a MemTable before executing the query
#[structopt(short = "m", long = "mem-table")]
mem_table: bool,
/// Path to machine readable output file
#[structopt(parse(from_os_str), short = "o", long = "output")]
output_path: Option<PathBuf>,
/// Whether to disable collection of statistics (and cost based optimizations) or not.
#[structopt(short = "S", long = "disable-statistics")]
disable_statistics: bool,
/// If true then hash join used, if false then sort merge join
/// True by default.
#[structopt(short = "j", long = "prefer_hash_join", default_value = "true")]
prefer_hash_join: BoolDefaultTrue,
/// If true then Piecewise Merge Join can be used, if false then it will opt for Nested Loop Join
/// True by default.
#[structopt(
short = "j",
long = "enable_piecewise_merge_join",
default_value = "false"
)]
enable_piecewise_merge_join: BoolDefaultTrue,
/// Mark the first column of each table as sorted in ascending order.
/// The tables should have been created with the `--sort` option for this to have any effect.
#[structopt(short = "t", long = "sorted")]
sorted: bool,
}
impl RunOpt {
pub async fn run(self) -> Result<()> {
println!("Running benchmarks with the following options: {self:?}");
let query_range = match self.query {
Some(query_id) => query_id..=query_id,
None => TPCH_QUERY_START_ID..=TPCH_QUERY_END_ID,
};
let mut benchmark_run = BenchmarkRun::new();
let mut config = self
.common
.config()?
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
config.options_mut().optimizer.enable_piecewise_merge_join =
self.enable_piecewise_merge_join;
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
// register tables
self.register_tables(&ctx).await?;
for query_id in query_range {
benchmark_run.start_new_case(&format!("Query {query_id}"));
let query_run = self.benchmark_query(query_id, &ctx).await;
match query_run {
Ok(query_results) => {
for iter in query_results {
benchmark_run.write_iter(iter.elapsed, iter.row_count);
}
}
Err(e) => {
benchmark_run.mark_failed();
eprintln!("Query {query_id} failed: {e}");
}
}
}
benchmark_run.maybe_write_json(self.output_path.as_ref())?;
benchmark_run.maybe_print_failures();
Ok(())
}
async fn benchmark_query(
&self,
query_id: usize,
ctx: &SessionContext,
) -> Result<Vec<QueryResult>> {
let mut millis = vec![];
// run benchmark
let mut query_results = vec![];
let sql = &get_query_sql(query_id)?;
for i in 0..self.iterations() {
let start = Instant::now();
// query 15 is special, with 3 statements. the second statement is the one from which we
// want to capture the results
let mut result = vec![];
if query_id == 15 {
for (n, query) in sql.iter().enumerate() {
if n == 1 {
result = self.execute_query(ctx, query).await?;
} else {
self.execute_query(ctx, query).await?;
}
}
} else {
for query in sql {
result = self.execute_query(ctx, query).await?;
}
}
let elapsed = start.elapsed();
let ms = elapsed.as_secs_f64() * 1000.0;
millis.push(ms);
info!("output:\n\n{}\n\n", pretty_format_batches(&result)?);
let row_count = result.iter().map(|b| b.num_rows()).sum();
println!(
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
);
query_results.push(QueryResult { elapsed, row_count });
}
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {query_id} avg time: {avg:.2} ms");
// Print memory stats using mimalloc (only when compiled with --features mimalloc_extended)
print_memory_stats();
Ok(query_results)
}
async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
for table in TPCH_TABLES {
let table_provider = { self.get_table(ctx, table).await? };
if self.mem_table {
println!("Loading table '{table}' into memory");
let start = Instant::now();
let memtable =
MemTable::load(table_provider, Some(self.partitions()), &ctx.state())
.await?;
println!(
"Loaded table '{}' into memory in {} ms",
table,
start.elapsed().as_millis()
);
ctx.register_table(*table, Arc::new(memtable))?;
} else {
ctx.register_table(*table, table_provider)?;
}
}
Ok(())
}
async fn execute_query(
&self,
ctx: &SessionContext,
sql: &str,
) -> Result<Vec<RecordBatch>> {
let debug = self.common.debug;
let plan = ctx.sql(sql).await?;
let (state, plan) = plan.into_parts();
if debug {
println!("=== Logical plan ===\n{plan}\n");
}
let plan = state.optimize(&plan)?;
if debug {
println!("=== Optimized logical plan ===\n{plan}\n");
}
let physical_plan = state.create_physical_plan(&plan).await?;
if debug {
println!(
"=== Physical plan ===\n{}\n",
displayable(physical_plan.as_ref()).indent(true)
);
}
let result = collect(physical_plan.clone(), state.task_ctx()).await?;
if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
.indent(true)
);
if !result.is_empty() {
// do not call print_batches if there are no batches as the result is confusing
// and makes it look like there is a batch with no columns
pretty::print_batches(&result)?;
}
}
Ok(result)
}
async fn get_table(
&self,
ctx: &SessionContext,
table: &str,
) -> Result<Arc<dyn TableProvider>> {
let path = self.path.to_str().unwrap();
let table_format = self.file_format.as_str();
let target_partitions = self.partitions();
// Obtain a snapshot of the SessionState
let state = ctx.state();
let (format, path, extension): (Arc<dyn FileFormat>, String, &'static str) =
match table_format {
// dbgen creates .tbl ('|' delimited) files without header
"tbl" => {
let path = format!("{path}/{table}.tbl");
let format = CsvFormat::default()
.with_delimiter(b'|')
.with_has_header(false);
(Arc::new(format), path, ".tbl")
}
"csv" => {
let path = format!("{path}/csv/{table}");
let format = CsvFormat::default()
.with_delimiter(b',')
.with_has_header(true);
(Arc::new(format), path, DEFAULT_CSV_EXTENSION)
}
"parquet" => {
let path = format!("{path}/{table}");
let format = ParquetFormat::default()
.with_options(ctx.state().table_options().parquet.clone());
(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
other => {
unimplemented!("Invalid file format '{}'", other);
}
};
let table_path = ListingTableUrl::parse(path)?;
let options = ListingOptions::new(format)
.with_file_extension(extension)
.with_target_partitions(target_partitions)
.with_collect_stat(state.config().collect_statistics());
let schema = match table_format {
"parquet" => options.infer_schema(&state, &table_path).await?,
"tbl" => Arc::new(get_tbl_tpch_table_schema(table)),
"csv" => Arc::new(get_tpch_table_schema(table)),
_ => unreachable!(),
};
let options = if self.sorted {
let key_column_name = schema.fields()[0].name();
options
.with_file_sort_order(vec![vec![col(key_column_name).sort(true, false)]])
} else {
options
};
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(schema);
Ok(Arc::new(ListingTable::try_new(config)?))
}
fn iterations(&self) -> usize {
self.common.iterations
}
fn partitions(&self) -> usize {
self.common
.partitions
.unwrap_or_else(get_available_parallelism)
}
}
#[cfg(test)]
// Only run with "ci" mode when we have the data
#[cfg(feature = "ci")]
mod tests {
use std::path::Path;
use super::*;
use datafusion::common::exec_err;
use datafusion::error::Result;
use datafusion_proto::bytes::{
logical_plan_from_bytes, logical_plan_to_bytes, physical_plan_from_bytes,
physical_plan_to_bytes,
};
fn get_tpch_data_path() -> Result<String> {
let path =
std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string());
if !Path::new(&path).exists() {
return exec_err!(
"Benchmark data not found (set TPCH_DATA env var to override): {}",
path
);
}
Ok(path)
}
async fn round_trip_logical_plan(query: usize) -> Result<()> {
let ctx = SessionContext::default();
let path = get_tpch_data_path()?;
let common = CommonOpt {
iterations: 1,
partitions: Some(2),
batch_size: Some(8192),
mem_pool_type: "fair".to_string(),
memory_limit: None,
sort_spill_reservation_bytes: None,
debug: false,
};
let opt = RunOpt {
query: Some(query),
common,
path: PathBuf::from(path.to_string()),
file_format: "tbl".to_string(),
mem_table: false,
output_path: None,
disable_statistics: false,
prefer_hash_join: true,
enable_piecewise_merge_join: false,
sorted: false,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(query)?;
for query in queries {
let plan = ctx.sql(&query).await?;
let plan = plan.into_optimized_plan()?;
let bytes = logical_plan_to_bytes(&plan)?;
let plan2 = logical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
let plan_formatted = format!("{}", plan.display_indent());
let plan2_formatted = format!("{}", plan2.display_indent());
assert_eq!(plan_formatted, plan2_formatted);
}
Ok(())
}
async fn round_trip_physical_plan(query: usize) -> Result<()> {
let ctx = SessionContext::default();
let path = get_tpch_data_path()?;
let common = CommonOpt {
iterations: 1,
partitions: Some(2),
batch_size: Some(8192),
mem_pool_type: "fair".to_string(),
memory_limit: None,
sort_spill_reservation_bytes: None,
debug: false,
};
let opt = RunOpt {
query: Some(query),
common,
path: PathBuf::from(path.to_string()),
file_format: "tbl".to_string(),
mem_table: false,
output_path: None,
disable_statistics: false,
prefer_hash_join: true,
enable_piecewise_merge_join: false,
sorted: false,
};
opt.register_tables(&ctx).await?;
let queries = get_query_sql(query)?;
for query in queries {
let plan = ctx.sql(&query).await?;
let plan = plan.create_physical_plan().await?;
let bytes = physical_plan_to_bytes(plan.clone())?;
let plan2 = physical_plan_from_bytes(&bytes, &ctx.task_ctx())?;
let plan_formatted = format!("{}", displayable(plan.as_ref()).indent(false));
let plan2_formatted =
format!("{}", displayable(plan2.as_ref()).indent(false));
assert_eq!(plan_formatted, plan2_formatted);
}
Ok(())
}
macro_rules! test_round_trip_logical {
($tn:ident, $query:expr) => {
#[tokio::test]
async fn $tn() -> Result<()> {
round_trip_logical_plan($query).await
}
};
}
macro_rules! test_round_trip_physical {
($tn:ident, $query:expr) => {
#[tokio::test]
async fn $tn() -> Result<()> {
round_trip_physical_plan($query).await
}
};
}
// logical plan tests
test_round_trip_logical!(round_trip_logical_plan_q1, 1);
test_round_trip_logical!(round_trip_logical_plan_q2, 2);
test_round_trip_logical!(round_trip_logical_plan_q3, 3);
test_round_trip_logical!(round_trip_logical_plan_q4, 4);
test_round_trip_logical!(round_trip_logical_plan_q5, 5);
test_round_trip_logical!(round_trip_logical_plan_q6, 6);
test_round_trip_logical!(round_trip_logical_plan_q7, 7);
test_round_trip_logical!(round_trip_logical_plan_q8, 8);
test_round_trip_logical!(round_trip_logical_plan_q9, 9);
test_round_trip_logical!(round_trip_logical_plan_q10, 10);
test_round_trip_logical!(round_trip_logical_plan_q11, 11);
test_round_trip_logical!(round_trip_logical_plan_q12, 12);
test_round_trip_logical!(round_trip_logical_plan_q13, 13);
test_round_trip_logical!(round_trip_logical_plan_q14, 14);
test_round_trip_logical!(round_trip_logical_plan_q15, 15);
test_round_trip_logical!(round_trip_logical_plan_q16, 16);
test_round_trip_logical!(round_trip_logical_plan_q17, 17);
test_round_trip_logical!(round_trip_logical_plan_q18, 18);
test_round_trip_logical!(round_trip_logical_plan_q19, 19);
test_round_trip_logical!(round_trip_logical_plan_q20, 20);
test_round_trip_logical!(round_trip_logical_plan_q21, 21);
test_round_trip_logical!(round_trip_logical_plan_q22, 22);
// physical plan tests
test_round_trip_physical!(round_trip_physical_plan_q1, 1);
test_round_trip_physical!(round_trip_physical_plan_q2, 2);
test_round_trip_physical!(round_trip_physical_plan_q3, 3);
test_round_trip_physical!(round_trip_physical_plan_q4, 4);
test_round_trip_physical!(round_trip_physical_plan_q5, 5);
test_round_trip_physical!(round_trip_physical_plan_q6, 6);
test_round_trip_physical!(round_trip_physical_plan_q7, 7);
test_round_trip_physical!(round_trip_physical_plan_q8, 8);
test_round_trip_physical!(round_trip_physical_plan_q9, 9);
test_round_trip_physical!(round_trip_physical_plan_q10, 10);
test_round_trip_physical!(round_trip_physical_plan_q11, 11);
test_round_trip_physical!(round_trip_physical_plan_q12, 12);
test_round_trip_physical!(round_trip_physical_plan_q13, 13);
test_round_trip_physical!(round_trip_physical_plan_q14, 14);
test_round_trip_physical!(round_trip_physical_plan_q15, 15);
test_round_trip_physical!(round_trip_physical_plan_q16, 16);
test_round_trip_physical!(round_trip_physical_plan_q17, 17);
test_round_trip_physical!(round_trip_physical_plan_q18, 18);
test_round_trip_physical!(round_trip_physical_plan_q19, 19);
test_round_trip_physical!(round_trip_physical_plan_q20, 20);
test_round_trip_physical!(round_trip_physical_plan_q21, 21);
test_round_trip_physical!(round_trip_physical_plan_q22, 22);
}