blob: 93d1d450fd82b3b6ed1fd3e748825ea318d81c79 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::fmt::{Display, Formatter};
use std::io;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use crate::object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, RequestSummaries,
};
use crate::print_format::PrintFormat;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use datafusion::common::instant::Instant;
use datafusion::common::DataFusionError;
use datafusion::error::Result;
use datafusion::physical_plan::RecordBatchStream;
use datafusion::config::FormatOptions;
use futures::StreamExt;
#[derive(Debug, Clone, PartialEq, Copy)]
pub enum MaxRows {
/// show all rows in the output
Unlimited,
/// Only show n rows
Limited(usize),
}
impl FromStr for MaxRows {
type Err = String;
fn from_str(maxrows: &str) -> Result<Self, Self::Err> {
if maxrows.to_lowercase() == "inf"
|| maxrows.to_lowercase() == "infinite"
|| maxrows.to_lowercase() == "none"
{
Ok(Self::Unlimited)
} else {
match maxrows.parse::<usize>() {
Ok(nrows) => Ok(Self::Limited(nrows)),
_ => Err(format!("Invalid maxrows {maxrows}. Valid inputs are natural numbers or \'none\', \'inf\', or \'infinite\' for no limit.")),
}
}
}
}
impl Display for MaxRows {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Self::Unlimited => write!(f, "unlimited"),
Self::Limited(max_rows) => write!(f, "at most {max_rows}"),
}
}
}
const OBJECT_STORE_PROFILING_HEADER: &str = "Object Store Profiling";
#[derive(Debug, Clone)]
pub struct PrintOptions {
pub format: PrintFormat,
pub quiet: bool,
pub maxrows: MaxRows,
pub color: bool,
pub instrumented_registry: Arc<InstrumentedObjectStoreRegistry>,
}
// Returns the query execution details formatted
fn get_execution_details_formatted(
row_count: usize,
maxrows: MaxRows,
query_start_time: Instant,
) -> String {
let nrows_shown_msg = match maxrows {
MaxRows::Limited(nrows) if nrows < row_count => {
format!("(First {nrows} displayed. Use --maxrows to adjust)")
}
_ => String::new(),
};
format!(
"{} row(s) fetched. {}\nElapsed {:.3} seconds.\n",
row_count,
nrows_shown_msg,
query_start_time.elapsed().as_secs_f64()
)
}
impl PrintOptions {
/// Print the batches to stdout using the specified format
pub fn print_batches(
&self,
schema: SchemaRef,
batches: &[RecordBatch],
query_start_time: Instant,
row_count: usize,
format_options: &FormatOptions,
) -> Result<()> {
let stdout = std::io::stdout();
let mut writer = stdout.lock();
self.format.print_batches(
&mut writer,
schema,
batches,
self.maxrows,
true,
format_options,
)?;
let formatted_exec_details = get_execution_details_formatted(
row_count,
if self.format == PrintFormat::Table {
self.maxrows
} else {
MaxRows::Unlimited
},
query_start_time,
);
self.write_output(&mut writer, formatted_exec_details)
}
/// Print the stream to stdout using the specified format
pub async fn print_stream(
&self,
mut stream: Pin<Box<dyn RecordBatchStream>>,
query_start_time: Instant,
format_options: &FormatOptions,
) -> Result<()> {
if self.format == PrintFormat::Table {
return Err(DataFusionError::External(
"PrintFormat::Table is not implemented".to_string().into(),
));
};
let stdout = std::io::stdout();
let mut writer = stdout.lock();
let mut row_count = 0_usize;
let mut with_header = true;
while let Some(maybe_batch) = stream.next().await {
let batch = maybe_batch?;
row_count += batch.num_rows();
self.format.print_batches(
&mut writer,
batch.schema(),
&[batch],
MaxRows::Unlimited,
with_header,
format_options,
)?;
with_header = false;
}
let formatted_exec_details = get_execution_details_formatted(
row_count,
MaxRows::Unlimited,
query_start_time,
);
self.write_output(&mut writer, formatted_exec_details)
}
fn write_output<W: io::Write>(
&self,
writer: &mut W,
formatted_exec_details: String,
) -> Result<()> {
if !self.quiet {
writeln!(writer, "{formatted_exec_details}")?;
let instrument_mode = self.instrumented_registry.instrument_mode();
if instrument_mode != InstrumentedObjectStoreMode::Disabled {
writeln!(writer, "{OBJECT_STORE_PROFILING_HEADER}")?;
for store in self.instrumented_registry.stores() {
let requests = store.take_requests();
if !requests.is_empty() {
writeln!(writer, "{store}")?;
if instrument_mode == InstrumentedObjectStoreMode::Trace {
for req in requests.iter() {
writeln!(writer, "{req}")?;
}
// Add an extra blank line to help visually organize the output
writeln!(writer)?;
}
writeln!(writer, "Summaries:")?;
let summaries = RequestSummaries::new(&requests);
writeln!(writer, "{summaries}")?;
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use datafusion::error::Result;
use super::*;
#[test]
fn write_output() -> Result<()> {
let instrumented_registry = Arc::new(InstrumentedObjectStoreRegistry::new());
let mut print_options = PrintOptions {
format: PrintFormat::Automatic,
quiet: true,
maxrows: MaxRows::Unlimited,
color: true,
instrumented_registry: Arc::clone(&instrumented_registry),
};
let mut print_output: Vec<u8> = Vec::new();
let exec_out = String::from("Formatted Exec Output");
print_options.write_output(&mut print_output, exec_out.clone())?;
assert!(print_output.is_empty());
print_options.quiet = false;
print_options.write_output(&mut print_output, exec_out.clone())?;
let out_str: String = print_output
.clone()
.try_into()
.expect("Expected successful String conversion");
assert!(out_str.contains(&exec_out));
// clear the previous data from the output so it doesn't pollute the next test
print_output.clear();
print_options
.instrumented_registry
.set_instrument_mode(InstrumentedObjectStoreMode::Trace);
print_options.write_output(&mut print_output, exec_out.clone())?;
let out_str: String = print_output
.clone()
.try_into()
.expect("Expected successful String conversion");
assert!(out_str.contains(&exec_out));
assert!(out_str.contains(OBJECT_STORE_PROFILING_HEADER));
Ok(())
}
}