| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Command within CLI |
| |
| use crate::cli_context::CliSessionContext; |
| use crate::exec::{exec_and_print, exec_from_lines}; |
| use crate::functions::{Function, display_all_functions}; |
| use crate::print_format::PrintFormat; |
| use crate::print_options::PrintOptions; |
| use clap::ValueEnum; |
| use datafusion::arrow::array::{ArrayRef, StringArray}; |
| use datafusion::arrow::datatypes::{DataType, Field, Schema}; |
| use datafusion::arrow::record_batch::RecordBatch; |
| use datafusion::common::instant::Instant; |
| use datafusion::common::{exec_datafusion_err, exec_err}; |
| use datafusion::error::Result; |
| use std::fs::File; |
| use std::io::BufReader; |
| use std::str::FromStr; |
| use std::sync::Arc; |
| |
| /// Command |
| #[derive(Debug)] |
| pub enum Command { |
| Quit, |
| Help, |
| ListTables, |
| DescribeTableStmt(String), |
| ListFunctions, |
| Include(Option<String>), |
| SearchFunctions(String), |
| QuietMode(Option<bool>), |
| OutputFormat(Option<String>), |
| ObjectStoreProfileMode(Option<String>), |
| } |
| |
| pub enum OutputFormat { |
| ChangeFormat(String), |
| } |
| |
| impl Command { |
| pub async fn execute( |
| &self, |
| ctx: &dyn CliSessionContext, |
| print_options: &mut PrintOptions, |
| ) -> Result<()> { |
| match self { |
| Self::Help => { |
| let now = Instant::now(); |
| let command_batch = all_commands_info(); |
| let schema = command_batch.schema(); |
| let num_rows = command_batch.num_rows(); |
| let task_ctx = ctx.task_ctx(); |
| let config = &task_ctx.session_config().options().format; |
| print_options.print_batches( |
| schema, |
| &[command_batch], |
| now, |
| num_rows, |
| config, |
| ) |
| } |
| Self::ListTables => { |
| exec_and_print(ctx, print_options, "SHOW TABLES".into()).await |
| } |
| Self::DescribeTableStmt(name) => { |
| exec_and_print(ctx, print_options, format!("SHOW COLUMNS FROM {name}")) |
| .await |
| } |
| Self::Include(filename) => { |
| if let Some(filename) = filename { |
| let file = File::open(filename).map_err(|e| { |
| exec_datafusion_err!("Error opening {filename:?} {e}") |
| })?; |
| exec_from_lines(ctx, &mut BufReader::new(file), print_options) |
| .await?; |
| Ok(()) |
| } else { |
| exec_err!("Required filename argument is missing") |
| } |
| } |
| Self::QuietMode(quiet) => { |
| if let Some(quiet) = quiet { |
| print_options.quiet = *quiet; |
| println!( |
| "Quiet mode set to {}", |
| if print_options.quiet { "true" } else { "false" } |
| ); |
| } else { |
| println!( |
| "Quiet mode is {}", |
| if print_options.quiet { "true" } else { "false" } |
| ); |
| } |
| Ok(()) |
| } |
| Self::Quit => exec_err!("Unexpected quit, this should be handled outside"), |
| Self::ListFunctions => display_all_functions(), |
| Self::SearchFunctions(function) => { |
| if let Ok(func) = function.parse::<Function>() { |
| let details = func.function_details()?; |
| println!("{details}"); |
| Ok(()) |
| } else { |
| exec_err!("{function} is not a supported function") |
| } |
| } |
| Self::OutputFormat(_) => exec_err!( |
| "Unexpected change output format, this should be handled outside" |
| ), |
| Self::ObjectStoreProfileMode(mode) => { |
| if let Some(mode) = mode { |
| let profile_mode = mode |
| .parse() |
| .map_err(|_| |
| exec_datafusion_err!("Failed to parse input: {mode}. Valid options are disabled, summary, trace") |
| )?; |
| print_options |
| .instrumented_registry |
| .set_instrument_mode(profile_mode); |
| println!( |
| "ObjectStore Profile mode set to {}", |
| print_options.instrumented_registry.instrument_mode() |
| ); |
| } else { |
| println!( |
| "ObjectStore Profile mode is {}", |
| print_options.instrumented_registry.instrument_mode() |
| ); |
| } |
| |
| Ok(()) |
| } |
| } |
| } |
| |
| fn get_name_and_description(&self) -> (&'static str, &'static str) { |
| match self { |
| Self::Quit => ("\\q", "quit datafusion-cli"), |
| Self::ListTables => ("\\d", "list tables"), |
| Self::DescribeTableStmt(_) => ("\\d name", "describe table"), |
| Self::Help => ("\\?", "help"), |
| Self::Include(_) => { |
| ("\\i filename", "reads input from the specified filename") |
| } |
| Self::ListFunctions => ("\\h", "function list"), |
| Self::SearchFunctions(_) => ("\\h function", "search function"), |
| Self::QuietMode(_) => ("\\quiet (true|false)?", "print or set quiet mode"), |
| Self::OutputFormat(_) => { |
| ("\\pset [NAME [VALUE]]", "set table output option\n(format)") |
| } |
| Self::ObjectStoreProfileMode(_) => ( |
| "\\object_store_profiling (disabled|summary|trace)", |
| "print or set object store profile mode", |
| ), |
| } |
| } |
| } |
| |
| const ALL_COMMANDS: [Command; 10] = [ |
| Command::ListTables, |
| Command::DescribeTableStmt(String::new()), |
| Command::Quit, |
| Command::Help, |
| Command::Include(Some(String::new())), |
| Command::ListFunctions, |
| Command::SearchFunctions(String::new()), |
| Command::QuietMode(None), |
| Command::OutputFormat(None), |
| Command::ObjectStoreProfileMode(None), |
| ]; |
| |
| fn all_commands_info() -> RecordBatch { |
| let schema = Arc::new(Schema::new(vec![ |
| Field::new("Command", DataType::Utf8, false), |
| Field::new("Description", DataType::Utf8, false), |
| ])); |
| let (names, description): (Vec<&str>, Vec<&str>) = ALL_COMMANDS |
| .into_iter() |
| .map(|c| c.get_name_and_description()) |
| .unzip(); |
| RecordBatch::try_new( |
| schema, |
| [names, description] |
| .into_iter() |
| .map(|i| Arc::new(StringArray::from(i)) as ArrayRef) |
| .collect::<Vec<_>>(), |
| ) |
| .expect("This should not fail") |
| } |
| |
| impl FromStr for Command { |
| type Err = (); |
| |
| fn from_str(s: &str) -> Result<Self, Self::Err> { |
| let (c, arg) = if let Some((a, b)) = s.split_once(' ') { |
| (a, Some(b)) |
| } else { |
| (s, None) |
| }; |
| Ok(match (c, arg) { |
| ("q", None) => Self::Quit, |
| ("d", None) => Self::ListTables, |
| ("d", Some(name)) => Self::DescribeTableStmt(name.into()), |
| ("?", None) => Self::Help, |
| ("h", None) => Self::ListFunctions, |
| ("h", Some(function)) => Self::SearchFunctions(function.into()), |
| ("i", None) => Self::Include(None), |
| ("i", Some(filename)) => Self::Include(Some(filename.to_owned())), |
| ("quiet", Some("true" | "t" | "yes" | "y" | "on")) => { |
| Self::QuietMode(Some(true)) |
| } |
| ("quiet", Some("false" | "f" | "no" | "n" | "off")) => { |
| Self::QuietMode(Some(false)) |
| } |
| ("quiet", None) => Self::QuietMode(None), |
| ("pset", Some(subcommand)) => { |
| Self::OutputFormat(Some(subcommand.to_string())) |
| } |
| ("pset", None) => Self::OutputFormat(None), |
| ("object_store_profiling", Some(mode)) => { |
| Self::ObjectStoreProfileMode(Some(mode.to_string())) |
| } |
| ("object_store_profiling", None) => Self::ObjectStoreProfileMode(None), |
| _ => return Err(()), |
| }) |
| } |
| } |
| |
| impl FromStr for OutputFormat { |
| type Err = (); |
| |
| fn from_str(s: &str) -> Result<Self, Self::Err> { |
| let (c, arg) = if let Some((a, b)) = s.split_once(' ') { |
| (a, Some(b)) |
| } else { |
| (s, None) |
| }; |
| Ok(match (c, arg) { |
| ("format", Some(format)) => Self::ChangeFormat(format.to_string()), |
| _ => return Err(()), |
| }) |
| } |
| } |
| |
| impl OutputFormat { |
| pub async fn execute(&self, print_options: &mut PrintOptions) -> Result<()> { |
| match self { |
| Self::ChangeFormat(format) => { |
| if let Ok(format) = format.parse::<PrintFormat>() { |
| print_options.format = format; |
| println!("Output format is {:?}.", print_options.format); |
| Ok(()) |
| } else { |
| exec_err!( |
| "{:?} is not a valid format type [possible values: {:?}]", |
| format, |
| PrintFormat::value_variants() |
| ) |
| } |
| } |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use datafusion::prelude::SessionContext; |
| |
| use crate::{ |
| object_storage::instrumented::{ |
| InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry, |
| }, |
| print_options::MaxRows, |
| }; |
| |
| use super::*; |
| |
| #[tokio::test] |
| async fn command_execute_profile_mode() { |
| let ctx = SessionContext::new(); |
| |
| let mut print_options = PrintOptions { |
| format: PrintFormat::Automatic, |
| quiet: false, |
| maxrows: MaxRows::Unlimited, |
| color: true, |
| instrumented_registry: Arc::new(InstrumentedObjectStoreRegistry::new()), |
| }; |
| |
| let mut cmd: Command = "object_store_profiling" |
| .parse() |
| .expect("expected parse to succeed"); |
| assert!(cmd.execute(&ctx, &mut print_options).await.is_ok()); |
| assert_eq!( |
| print_options.instrumented_registry.instrument_mode(), |
| InstrumentedObjectStoreMode::default() |
| ); |
| |
| cmd = "object_store_profiling summary" |
| .parse() |
| .expect("expected parse to succeed"); |
| assert!(cmd.execute(&ctx, &mut print_options).await.is_ok()); |
| assert_eq!( |
| print_options.instrumented_registry.instrument_mode(), |
| InstrumentedObjectStoreMode::Summary |
| ); |
| |
| cmd = "object_store_profiling trace" |
| .parse() |
| .expect("expected parse to succeed"); |
| assert!(cmd.execute(&ctx, &mut print_options).await.is_ok()); |
| assert_eq!( |
| print_options.instrumented_registry.instrument_mode(), |
| InstrumentedObjectStoreMode::Trace |
| ); |
| |
| cmd = "object_store_profiling does_not_exist" |
| .parse() |
| .expect("expected parse to succeed"); |
| assert!(cmd.execute(&ctx, &mut print_options).await.is_err()); |
| } |
| } |