| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use datafusion::{error::Result, DATAFUSION_VERSION}; |
| use datafusion_common::utils::get_available_parallelism; |
| use serde::{Serialize, Serializer}; |
| use serde_json::Value; |
| use std::{ |
| collections::HashMap, |
| path::Path, |
| time::{Duration, SystemTime}, |
| }; |
| |
| fn serialize_start_time<S>(start_time: &SystemTime, ser: S) -> Result<S::Ok, S::Error> |
| where |
| S: Serializer, |
| { |
| ser.serialize_u64( |
| start_time |
| .duration_since(SystemTime::UNIX_EPOCH) |
| .expect("current time is later than UNIX_EPOCH") |
| .as_secs(), |
| ) |
| } |
| fn serialize_elapsed<S>(elapsed: &Duration, ser: S) -> Result<S::Ok, S::Error> |
| where |
| S: Serializer, |
| { |
| let ms = elapsed.as_secs_f64() * 1000.0; |
| ser.serialize_f64(ms) |
| } |
| #[derive(Debug, Serialize)] |
| pub struct RunContext { |
| /// Benchmark crate version |
| pub benchmark_version: String, |
| /// DataFusion crate version |
| pub datafusion_version: String, |
| /// Number of CPU cores |
| pub num_cpus: usize, |
| /// Start time |
| #[serde(serialize_with = "serialize_start_time")] |
| pub start_time: SystemTime, |
| /// CLI arguments |
| pub arguments: Vec<String>, |
| } |
| |
| impl Default for RunContext { |
| fn default() -> Self { |
| Self::new() |
| } |
| } |
| |
| impl RunContext { |
| pub fn new() -> Self { |
| Self { |
| benchmark_version: env!("CARGO_PKG_VERSION").to_owned(), |
| datafusion_version: DATAFUSION_VERSION.to_owned(), |
| num_cpus: get_available_parallelism(), |
| start_time: SystemTime::now(), |
| arguments: std::env::args().skip(1).collect::<Vec<String>>(), |
| } |
| } |
| } |
| |
| /// A single iteration of a benchmark query |
| #[derive(Debug, Serialize)] |
| struct QueryIter { |
| #[serde(serialize_with = "serialize_elapsed")] |
| elapsed: Duration, |
| row_count: usize, |
| } |
| /// A single benchmark case |
| #[derive(Debug, Serialize)] |
| pub struct BenchQuery { |
| query: String, |
| iterations: Vec<QueryIter>, |
| #[serde(serialize_with = "serialize_start_time")] |
| start_time: SystemTime, |
| success: bool, |
| } |
| /// Internal representation of a single benchmark query iteration result. |
| pub struct QueryResult { |
| pub elapsed: Duration, |
| pub row_count: usize, |
| } |
| /// collects benchmark run data and then serializes it at the end |
| pub struct BenchmarkRun { |
| context: RunContext, |
| queries: Vec<BenchQuery>, |
| current_case: Option<usize>, |
| } |
| |
| impl Default for BenchmarkRun { |
| fn default() -> Self { |
| Self::new() |
| } |
| } |
| |
| impl BenchmarkRun { |
| // create new |
| pub fn new() -> Self { |
| Self { |
| context: RunContext::new(), |
| queries: vec![], |
| current_case: None, |
| } |
| } |
| /// begin a new case. iterations added after this will be included in the new case |
| pub fn start_new_case(&mut self, id: &str) { |
| self.queries.push(BenchQuery { |
| query: id.to_owned(), |
| iterations: vec![], |
| start_time: SystemTime::now(), |
| success: true, |
| }); |
| if let Some(c) = self.current_case.as_mut() { |
| *c += 1; |
| } else { |
| self.current_case = Some(0); |
| } |
| } |
| /// Write a new iteration to the current case |
| pub fn write_iter(&mut self, elapsed: Duration, row_count: usize) { |
| if let Some(idx) = self.current_case { |
| self.queries[idx] |
| .iterations |
| .push(QueryIter { elapsed, row_count }) |
| } else { |
| panic!("no cases existed yet"); |
| } |
| } |
| |
| /// Print the names of failed queries, if any |
| pub fn maybe_print_failures(&self) { |
| let failed_queries: Vec<&str> = self |
| .queries |
| .iter() |
| .filter_map(|q| (!q.success).then_some(q.query.as_str())) |
| .collect(); |
| |
| if !failed_queries.is_empty() { |
| println!("Failed Queries: {}", failed_queries.join(", ")); |
| } |
| } |
| |
| /// Mark current query |
| pub fn mark_failed(&mut self) { |
| if let Some(idx) = self.current_case { |
| self.queries[idx].success = false; |
| } else { |
| unreachable!("Cannot mark failure: no current case"); |
| } |
| } |
| |
| /// Stringify data into formatted json |
| pub fn to_json(&self) -> String { |
| let mut output = HashMap::<&str, Value>::new(); |
| output.insert("context", serde_json::to_value(&self.context).unwrap()); |
| output.insert("queries", serde_json::to_value(&self.queries).unwrap()); |
| serde_json::to_string_pretty(&output).unwrap() |
| } |
| |
| /// Write data as json into output path if it exists. |
| pub fn maybe_write_json(&self, maybe_path: Option<impl AsRef<Path>>) -> Result<()> { |
| if let Some(path) = maybe_path { |
| std::fs::write(path, self.to_json())?; |
| }; |
| Ok(()) |
| } |
| } |