blob: c1395aa4f562cb1393128909b118538cc85e964a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::process::Command;
use rstest::rstest;
use async_trait::async_trait;
use insta::{glob, Settings};
use insta_cmd::{assert_cmd_snapshot, get_cargo_bin};
use std::path::PathBuf;
use std::{env, fs};
use testcontainers::core::{CmdWaitFor, ExecCommand, Mount};
use testcontainers::runners::AsyncRunner;
use testcontainers::{ContainerAsync, ImageExt, TestcontainersError};
use testcontainers_modules::minio;
fn cli() -> Command {
Command::new(get_cargo_bin("datafusion-cli"))
}
fn make_settings() -> Settings {
let mut settings = Settings::clone_current();
settings.set_prepend_module_to_snapshot(false);
settings.add_filter(r"Elapsed .* seconds\.", "[ELAPSED]");
settings.add_filter(r"DataFusion CLI v.*", "[CLI_VERSION]");
settings.add_filter(r"(?s)backtrace:.*?\n\n\n", "");
settings
}
async fn setup_minio_container() -> ContainerAsync<minio::MinIO> {
const MINIO_ROOT_USER: &str = "TEST-DataFusionLogin";
const MINIO_ROOT_PASSWORD: &str = "TEST-DataFusionPassword";
let data_path =
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../datafusion/core/tests/data");
let absolute_data_path = data_path
.canonicalize()
.expect("Failed to get absolute path for test data");
let container = minio::MinIO::default()
.with_env_var("MINIO_ROOT_USER", MINIO_ROOT_USER)
.with_env_var("MINIO_ROOT_PASSWORD", MINIO_ROOT_PASSWORD)
.with_mount(Mount::bind_mount(
absolute_data_path.to_str().unwrap(),
"/source",
))
.start()
.await;
match container {
Ok(container) => {
// We wait for MinIO to be healthy and prepare test files. We do it via CLI to avoid s3 dependency
let commands = [
ExecCommand::new(["/usr/bin/mc", "ready", "local"]),
ExecCommand::new([
"/usr/bin/mc",
"alias",
"set",
"localminio",
"http://localhost:9000",
MINIO_ROOT_USER,
MINIO_ROOT_PASSWORD,
]),
ExecCommand::new(["/usr/bin/mc", "mb", "localminio/data"]),
ExecCommand::new([
"/usr/bin/mc",
"cp",
"-r",
"/source/",
"localminio/data/",
]),
];
for command in commands {
let command =
command.with_cmd_ready_condition(CmdWaitFor::Exit { code: Some(0) });
let cmd_ref = format!("{command:?}");
if let Err(e) = container.exec(command).await {
let stdout = container.stdout_to_vec().await.unwrap_or_default();
let stderr = container.stderr_to_vec().await.unwrap_or_default();
panic!(
"Failed to execute command: {}\nError: {}\nStdout: {:?}\nStderr: {:?}",
cmd_ref,
e,
String::from_utf8_lossy(&stdout),
String::from_utf8_lossy(&stderr)
);
}
}
container
}
Err(TestcontainersError::Client(e)) => {
panic!("Failed to start MinIO container. Ensure Docker is running and accessible: {e}");
}
Err(e) => {
panic!("Failed to start MinIO container: {e}");
}
}
}
#[cfg(test)]
#[ctor::ctor]
fn init() {
// Enable RUST_LOG logging configuration for tests
let _ = env_logger::try_init();
}
#[rstest]
#[case::exec_multiple_statements(
"statements",
["--command", "select 1; select 2;", "-q"],
)]
#[case::exec_backslash(
"backslash",
["--file", "tests/sql/backslash.sql", "--format", "json", "-q"],
)]
#[case::exec_from_files(
"files",
["--file", "tests/sql/select.sql", "-q"],
)]
#[case::set_batch_size(
"batch_size",
["--command", "show datafusion.execution.batch_size", "-q", "-b", "1"],
)]
#[case::default_explain_plan(
"default_explain_plan",
// default explain format should be tree
["--command", "EXPLAIN SELECT 123"],
)]
#[case::can_see_indent_format(
"can_see_indent_format",
// can choose the old explain format too
["--command", "EXPLAIN FORMAT indent SELECT 123"],
)]
#[case::change_format_version(
"change_format_version",
["--file", "tests/sql/types_format.sql", "-q"],
)]
#[test]
fn cli_quick_test<'a>(
#[case] snapshot_name: &'a str,
#[case] args: impl IntoIterator<Item = &'a str>,
) {
let mut settings = make_settings();
settings.set_snapshot_suffix(snapshot_name);
let _bound = settings.bind_to_scope();
let mut cmd = cli();
cmd.args(args);
assert_cmd_snapshot!(cmd);
}
#[test]
fn cli_explain_environment_overrides() {
let mut settings = make_settings();
settings.set_snapshot_suffix("explain_plan_environment_overrides");
let _bound = settings.bind_to_scope();
let mut cmd = cli();
// should use the environment variable to override the default explain plan
cmd.env("DATAFUSION_EXPLAIN_FORMAT", "pgjson")
.args(["--command", "EXPLAIN SELECT 123"]);
assert_cmd_snapshot!(cmd);
}
#[rstest]
#[case("csv")]
#[case("tsv")]
#[case("table")]
#[case("json")]
#[case("nd-json")]
#[case("automatic")]
#[test]
fn test_cli_format<'a>(#[case] format: &'a str) {
let mut settings = make_settings();
settings.set_snapshot_suffix(format);
let _bound = settings.bind_to_scope();
let mut cmd = cli();
cmd.args(["--command", "select 1", "-q", "--format", format]);
assert_cmd_snapshot!(cmd);
}
#[rstest]
#[case("no_track", ["--top-memory-consumers", "0"])]
#[case("top2", ["--top-memory-consumers", "2"])]
#[case("top3_default", [])]
#[test]
fn test_cli_top_memory_consumers<'a>(
#[case] snapshot_name: &str,
#[case] top_memory_consumers: impl IntoIterator<Item = &'a str>,
) {
let mut settings = make_settings();
settings.set_snapshot_suffix(snapshot_name);
settings.add_filter(
r"[^\s]+\#\d+\(can spill: (true|false)\) consumed .*?B, peak .*?B",
"Consumer(can spill: bool) consumed XB, peak XB",
);
settings.add_filter(
r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",
"Error: Failed to allocate ",
);
settings.add_filter(
r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",
"Resources exhausted: Failed to allocate",
);
let _bound = settings.bind_to_scope();
let mut cmd = cli();
let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;";
cmd.args(["--memory-limit", "10M", "--command", sql]);
cmd.args(top_memory_consumers);
assert_cmd_snapshot!(cmd);
}
#[tokio::test]
async fn test_cli() {
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
eprintln!("Skipping external storages integration tests");
return;
}
let container = setup_minio_container().await;
let settings = make_settings();
let _bound = settings.bind_to_scope();
let port = container.get_host_port_ipv4(9000).await.unwrap();
glob!("sql/integration/*.sql", |path| {
let input = fs::read_to_string(path).unwrap();
assert_cmd_snapshot!(cli()
.env_clear()
.env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
.env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
.env("AWS_ENDPOINT", format!("http://localhost:{port}"))
.env("AWS_ALLOW_HTTP", "true")
.pass_stdin(input))
});
}
#[tokio::test]
async fn test_aws_options() {
// Separate test is needed to pass aws as options in sql and not via env
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
eprintln!("Skipping external storages integration tests");
return;
}
let settings = make_settings();
let _bound = settings.bind_to_scope();
let container = setup_minio_container().await;
let port = container.get_host_port_ipv4(9000).await.unwrap();
let input = format!(
r#"CREATE EXTERNAL TABLE CARS
STORED AS CSV
LOCATION 's3://data/cars.csv'
OPTIONS(
'aws.access_key_id' 'TEST-DataFusionLogin',
'aws.secret_access_key' 'TEST-DataFusionPassword',
'aws.endpoint' 'http://localhost:{port}',
'aws.allow_http' 'true'
);
SELECT * FROM CARS limit 1;
"#
);
assert_cmd_snapshot!(cli().env_clear().pass_stdin(input));
}
#[tokio::test]
async fn test_aws_region_auto_resolution() {
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
eprintln!("Skipping external storages integration tests");
return;
}
let mut settings = make_settings();
settings.add_filter(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", "[TIME]");
let _bound = settings.bind_to_scope();
let bucket = "s3://clickhouse-public-datasets/hits_compatible/athena_partitioned/hits_1.parquet";
let region = "us-east-1";
let input = format!(
r#"CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION '{bucket}'
OPTIONS(
'aws.region' '{region}',
'aws.skip_signature' true
);
SELECT COUNT(*) FROM hits;
"#
);
assert_cmd_snapshot!(cli()
.env("RUST_LOG", "warn")
.env_remove("AWS_ENDPOINT")
.pass_stdin(input));
}
/// Ensure backtrace will be printed, if executing `datafusion-cli` with a query
/// that triggers error.
/// Example:
/// RUST_BACKTRACE=1 cargo run --features backtrace -- -c 'select pow(1,'foo');'
#[rstest]
#[case("SELECT pow(1,'foo')")]
#[case("SELECT CAST('not_a_number' AS INTEGER);")]
#[cfg(feature = "backtrace")]
fn test_backtrace_output(#[case] query: &str) {
let mut cmd = cli();
// Use a command that will cause an error and trigger backtrace
cmd.args(["--command", query, "-q"])
.env("RUST_BACKTRACE", "1"); // Enable backtrace
let output = cmd.output().expect("Failed to execute command");
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
let combined_output = format!("{}{}", stdout, stderr);
// Assert that the output includes literal 'backtrace'
assert!(
combined_output.to_lowercase().contains("backtrace"),
"Expected output to contain 'backtrace', but got stdout: '{}' stderr: '{}'",
stdout,
stderr
);
}
#[tokio::test]
async fn test_s3_url_fallback() {
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
eprintln!("Skipping external storages integration tests");
return;
}
let container = setup_minio_container().await;
let mut settings = make_settings();
settings.set_snapshot_suffix("s3_url_fallback");
let _bound = settings.bind_to_scope();
// Create a table using a prefix path (without trailing slash)
// This should trigger the fallback logic where head() fails on the prefix
// and list() is used to discover the actual files
let input = r#"CREATE EXTERNAL TABLE partitioned_data
STORED AS CSV
LOCATION 's3://data/partitioned_csv'
OPTIONS (
'format.has_header' 'false'
);
SELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5;
"#;
assert_cmd_snapshot!(cli().with_minio(&container).await.pass_stdin(input));
}
/// Validate object store profiling output
#[tokio::test]
async fn test_object_store_profiling() {
if env::var("TEST_STORAGE_INTEGRATION").is_err() {
eprintln!("Skipping external storages integration tests");
return;
}
let container = setup_minio_container().await;
let mut settings = make_settings();
// as the object store profiling contains timestamps and durations, we must
// filter them out to have stable snapshots
//
// Example line to filter:
// 2025-10-11T12:02:59.722646+00:00 operation=Get duration=0.001495s size=1006 path=cars.csv
// Output:
// <TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
settings.add_filter(
r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?[+-]\d{2}:\d{2} operation=(Get|Put|Delete|List|Head) duration=\d+\.\d{6}s (size=\d+\s+)?path=(.*)",
"<TIMESTAMP> operation=$1 duration=[DURATION] ${2}path=$3",
);
// We also need to filter out the summary statistics (anything with an 's' at the end)
// Example line(s) to filter:
// | Get | duration | 5.000000s | 5.000000s | 5.000000s | | 1 |
settings.add_filter(
r"\| (Get|Put|Delete|List|Head)( +)\| duration \| .*? \| .*? \| .*? \| .*? \| (.*?) \|",
"| $1$2 | duration | ...NORMALIZED...| $3 |",
);
let _bound = settings.bind_to_scope();
let input = r#"
CREATE EXTERNAL TABLE CARS
STORED AS CSV
LOCATION 's3://data/cars.csv';
-- Initial query should not show any profiling as the object store is not instrumented yet
SELECT * from CARS LIMIT 1;
\object_store_profiling trace
-- Query again to see the full profiling output
SELECT * from CARS LIMIT 1;
\object_store_profiling summary
-- Query again to see the summarized profiling output
SELECT * from CARS LIMIT 1;
\object_store_profiling disabled
-- Final query should not show any profiling as we disabled it again
SELECT * from CARS LIMIT 1;
"#;
assert_cmd_snapshot!(cli().with_minio(&container).await.pass_stdin(input));
}
/// Extension trait to Add the minio connection information to a Command
#[async_trait]
trait MinioCommandExt {
async fn with_minio(&mut self, container: &ContainerAsync<minio::MinIO>)
-> &mut Self;
}
#[async_trait]
impl MinioCommandExt for Command {
async fn with_minio(
&mut self,
container: &ContainerAsync<minio::MinIO>,
) -> &mut Self {
let port = container.get_host_port_ipv4(9000).await.unwrap();
self.env_clear()
.env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
.env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
.env("AWS_ENDPOINT", format!("http://localhost:{port}"))
.env("AWS_ALLOW_HTTP", "true")
}
}