blob: 4da6921ba53c5a6d0b325392e32109e9986b8522 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Runtime configuration, via [`ConfigOptions`]
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::fmt::{self, Display};
use std::str::FromStr;
use crate::error::_config_err;
use crate::parsers::CompressionTypeVariant;
use crate::utils::get_available_parallelism;
use crate::{DataFusionError, Result};
/// A macro that wraps a configuration struct and automatically derives
/// [`Default`] and [`ConfigField`] for it, allowing it to be used
/// in the [`ConfigOptions`] configuration tree.
///
/// `transform` is used to normalize values before parsing.
///
/// For example,
///
/// ```ignore
/// config_namespace! {
/// /// Amazing config
/// pub struct MyConfig {
/// /// Field 1 doc
/// field1: String, transform = str::to_lowercase, default = "".to_string()
///
/// /// Field 2 doc
/// field2: usize, default = 232
///
/// /// Field 3 doc
/// field3: Option<usize>, default = None
/// }
///}
/// ```
///
/// Will generate
///
/// ```ignore
/// /// Amazing config
/// #[derive(Debug, Clone)]
/// #[non_exhaustive]
/// pub struct MyConfig {
/// /// Field 1 doc
/// field1: String,
/// /// Field 2 doc
/// field2: usize,
/// /// Field 3 doc
/// field3: Option<usize>,
/// }
/// impl ConfigField for MyConfig {
/// fn set(&mut self, key: &str, value: &str) -> Result<()> {
/// let (key, rem) = key.split_once('.').unwrap_or((key, ""));
/// match key {
/// "field1" => {
/// let value = str::to_lowercase(value);
/// self.field1.set(rem, value.as_ref())
/// },
/// "field2" => self.field2.set(rem, value.as_ref()),
/// "field3" => self.field3.set(rem, value.as_ref()),
/// _ => _internal_err!(
/// "Config value \"{}\" not found on MyConfig",
/// key
/// ),
/// }
/// }
///
/// fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
/// let key = format!("{}.field1", key_prefix);
/// let desc = "Field 1 doc";
/// self.field1.visit(v, key.as_str(), desc);
/// let key = format!("{}.field2", key_prefix);
/// let desc = "Field 2 doc";
/// self.field2.visit(v, key.as_str(), desc);
/// let key = format!("{}.field3", key_prefix);
/// let desc = "Field 3 doc";
/// self.field3.visit(v, key.as_str(), desc);
/// }
/// }
///
/// impl Default for MyConfig {
/// fn default() -> Self {
/// Self {
/// field1: "".to_string(),
/// field2: 232,
/// field3: None,
/// }
/// }
/// }
/// ```
///
/// NB: Misplaced commas may result in nonsensical errors
#[macro_export]
macro_rules! config_namespace {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, $(warn = $warn: expr,)? $(transform = $transform:expr,)? default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])*
#[derive(Debug, Clone, PartialEq)]
$vis struct $struct_name{
$(
$(#[doc = $d])*
$field_vis $field_name : $field_type,
)*
}
impl ConfigField for $struct_name {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
$(
stringify!($field_name) => {
$(let value = $transform(value);)?
$(log::warn!($warn);)?
self.$field_name.set(rem, value.as_ref())
},
)*
_ => return _config_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
)
}
}
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
$(
let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
let desc = concat!($($d),*).trim();
self.$field_name.visit(v, key.as_str(), desc);
)*
}
}
impl Default for $struct_name {
fn default() -> Self {
Self {
$($field_name: $default),*
}
}
}
}
}
config_namespace! {
/// Options related to catalog and directory scanning
///
/// See also: [`SessionConfig`]
///
/// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
pub struct CatalogOptions {
/// Whether the default catalog and schema should be created automatically.
pub create_default_catalog_and_schema: bool, default = true
/// The default catalog name - this impacts what SQL queries use if not specified
pub default_catalog: String, default = "datafusion".to_string()
/// The default schema name - this impacts what SQL queries use if not specified
pub default_schema: String, default = "public".to_string()
/// Should DataFusion provide access to `information_schema`
/// virtual tables for displaying schema information
pub information_schema: bool, default = false
/// Location scanned to load tables for `default` schema
pub location: Option<String>, default = None
/// Type of `TableProvider` to use when loading `default` schema
pub format: Option<String>, default = None
/// Default value for `format.has_header` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
pub has_header: bool, default = true
/// Specifies whether newlines in (quoted) CSV values are supported.
///
/// This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE`
/// if not specified explicitly in the statement.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
pub newlines_in_values: bool, default = false
}
}
config_namespace! {
/// Options related to SQL parser
///
/// See also: [`SessionConfig`]
///
/// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
pub struct SqlParserOptions {
/// When set to true, SQL parser will parse float as decimal type
pub parse_float_as_decimal: bool, default = false
/// When set to true, SQL parser will normalize ident (convert ident to lowercase when not quoted)
pub enable_ident_normalization: bool, default = true
/// When set to true, SQL parser will normalize options value (convert value to lowercase).
/// Note that this option is ignored and will be removed in the future. All case-insensitive values
/// are normalized automatically.
pub enable_options_value_normalization: bool, warn = "`enable_options_value_normalization` is deprecated and ignored", default = false
/// Configure the SQL dialect used by DataFusion's parser; supported values include: Generic,
/// MySQL, PostgreSQL, Hive, SQLite, Snowflake, Redshift, MsSQL, ClickHouse, BigQuery, and Ansi.
pub dialect: String, default = "generic".to_string()
// no need to lowercase because `sqlparser::dialect_from_str`] is case-insensitive
/// If true, permit lengths for `VARCHAR` such as `VARCHAR(20)`, but
/// ignore the length. If false, error if a `VARCHAR` with a length is
/// specified. The Arrow type system does not have a notion of maximum
/// string length and thus DataFusion can not enforce such limits.
pub support_varchar_with_length: bool, default = true
}
}
config_namespace! {
/// Options related to query execution
///
/// See also: [`SessionConfig`]
///
/// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
pub struct ExecutionOptions {
/// Default batch size while creating new batches, it's especially useful for
/// buffer-in-memory batches since creating tiny batches would result in too much
/// metadata memory consumption
pub batch_size: usize, default = 8192
/// When set to true, record batches will be examined between each operator and
/// small batches will be coalesced into larger batches. This is helpful when there
/// are highly selective filters or joins that could produce tiny output batches. The
/// target batch size is determined by the configuration setting
pub coalesce_batches: bool, default = true
/// Should DataFusion collect statistics after listing files
pub collect_statistics: bool, default = false
/// Number of partitions for query execution. Increasing partitions can increase
/// concurrency.
///
/// Defaults to the number of CPU cores on the system
pub target_partitions: usize, default = get_available_parallelism()
/// The default time zone
///
/// Some functions, e.g. `EXTRACT(HOUR from SOME_TIME)`, shift the underlying datetime
/// according to this time zone, and then extract the hour
pub time_zone: Option<String>, default = Some("+00:00".into())
/// Parquet options
pub parquet: ParquetOptions, default = Default::default()
/// Fan-out during initial physical planning.
///
/// This is mostly use to plan `UNION` children in parallel.
///
/// Defaults to the number of CPU cores on the system
pub planning_concurrency: usize, default = get_available_parallelism()
/// When set to true, skips verifying that the schema produced by
/// planning the input of `LogicalPlan::Aggregate` exactly matches the
/// schema of the input plan.
///
/// When set to false, if the schema does not match exactly
/// (including nullability and metadata), a planning error will be raised.
///
/// This is used to workaround bugs in the planner that are now caught by
/// the new schema verification step.
pub skip_physical_aggregate_schema_check: bool, default = false
/// Specifies the reserved memory for each spillable sort operation to
/// facilitate an in-memory merge.
///
/// When a sort operation spills to disk, the in-memory data must be
/// sorted and merged before being written to a file. This setting reserves
/// a specific amount of memory for that in-memory sort/merge process.
///
/// Note: This setting is irrelevant if the sort operation cannot spill
/// (i.e., if there's no `DiskManager` configured).
pub sort_spill_reservation_bytes: usize, default = 10 * 1024 * 1024
/// When sorting, below what size should data be concatenated
/// and sorted in a single RecordBatch rather than sorted in
/// batches and merged.
pub sort_in_place_threshold_bytes: usize, default = 1024 * 1024
/// Number of files to read in parallel when inferring schema and statistics
pub meta_fetch_concurrency: usize, default = 32
/// Guarantees a minimum level of output files running in parallel.
/// RecordBatches will be distributed in round robin fashion to each
/// parallel writer. Each writer is closed and a new file opened once
/// soft_max_rows_per_output_file is reached.
pub minimum_parallel_output_files: usize, default = 4
/// Target number of rows in output files when writing multiple.
/// This is a soft max, so it can be exceeded slightly. There also
/// will be one file smaller than the limit if the total
/// number of rows written is not roughly divisible by the soft max
pub soft_max_rows_per_output_file: usize, default = 50000000
/// This is the maximum number of RecordBatches buffered
/// for each output file being worked. Higher values can potentially
/// give faster write performance at the cost of higher peak
/// memory consumption
pub max_buffered_batches_per_output_file: usize, default = 2
/// Should sub directories be ignored when scanning directories for data
/// files. Defaults to true (ignores subdirectories), consistent with
/// Hive. Note that this setting does not affect reading partitioned
/// tables (e.g. `/table/year=2021/month=01/data.parquet`).
pub listing_table_ignore_subdirectory: bool, default = true
/// Should DataFusion support recursive CTEs
pub enable_recursive_ctes: bool, default = true
/// Attempt to eliminate sorts by packing & sorting files with non-overlapping
/// statistics into the same file groups.
/// Currently experimental
pub split_file_groups_by_statistics: bool, default = false
/// Should DataFusion keep the columns used for partition_by in the output RecordBatches
pub keep_partition_by_columns: bool, default = false
/// Aggregation ratio (number of distinct groups / number of input rows)
/// threshold for skipping partial aggregation. If the value is greater
/// then partial aggregation will skip aggregation for further input
pub skip_partial_aggregation_probe_ratio_threshold: f64, default = 0.8
/// Number of input rows partial aggregation partition should process, before
/// aggregation ratio check and trying to switch to skipping aggregation mode
pub skip_partial_aggregation_probe_rows_threshold: usize, default = 100_000
/// Should DataFusion use row number estimates at the input to decide
/// whether increasing parallelism is beneficial or not. By default,
/// only exact row numbers (not estimates) are used for this decision.
/// Setting this flag to `true` will likely produce better plans.
/// if the source of statistics is accurate.
/// We plan to make this the default in the future.
pub use_row_number_estimates_to_optimize_partitioning: bool, default = false
/// Should DataFusion enforce batch size in joins or not. By default,
/// DataFusion will not enforce batch size in joins. Enforcing batch size
/// in joins can reduce memory usage when joining large
/// tables with a highly-selective join filter, but is also slightly slower.
pub enforce_batch_size_in_joins: bool, default = false
}
}
config_namespace! {
/// Options for reading and writing parquet files
///
/// See also: [`SessionConfig`]
///
/// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
pub struct ParquetOptions {
// The following options affect reading parquet files
/// (reading) If true, reads the Parquet data page level metadata (the
/// Page Index), if present, to reduce the I/O and number of
/// rows decoded.
pub enable_page_index: bool, default = true
/// (reading) If true, the parquet reader attempts to skip entire row groups based
/// on the predicate in the query and the metadata (min/max values) stored in
/// the parquet file
pub pruning: bool, default = true
/// (reading) If true, the parquet reader skip the optional embedded metadata that may be in
/// the file Schema. This setting can help avoid schema conflicts when querying
/// multiple parquet files with schemas containing compatible types but different metadata
pub skip_metadata: bool, default = true
/// (reading) If specified, the parquet reader will try and fetch the last `size_hint`
/// bytes of the parquet file optimistically. If not specified, two reads are required:
/// One read to fetch the 8-byte parquet footer and
/// another to fetch the metadata length encoded in the footer
pub metadata_size_hint: Option<usize>, default = None
/// (reading) If true, filter expressions are be applied during the parquet decoding operation to
/// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
pub pushdown_filters: bool, default = false
/// (reading) If true, filter expressions evaluated during the parquet decoding operation
/// will be reordered heuristically to minimize the cost of evaluation. If false,
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false
/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
pub schema_force_view_types: bool, default = true
/// (reading) If true, parquet reader will read columns of
/// `Binary/LargeBinary` with `Utf8`, and `BinaryView` with `Utf8View`.
///
/// Parquet files generated by some legacy writers do not correctly set
/// the UTF8 flag for strings, causing string columns to be loaded as
/// BLOB instead.
pub binary_as_string: bool, default = false
// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties
/// (writing) Sets best effort maximum size of data page in bytes
pub data_pagesize_limit: usize, default = 1024 * 1024
/// (writing) Sets write_batch_size in bytes
pub write_batch_size: usize, default = 1024
/// (writing) Sets parquet writer version
/// valid values are "1.0" and "2.0"
pub writer_version: String, default = "1.0".to_string()
/// (writing) Skip encoding the embedded arrow metadata in the KV_meta
///
/// This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`.
/// Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
pub skip_arrow_metadata: bool, default = false
/// (writing) Sets default parquet compression codec.
/// Valid values are: uncompressed, snappy, gzip(level),
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
///
/// Note that this default setting is not the same as
/// the default parquet writer setting.
pub compression: Option<String>, transform = str::to_lowercase, default = Some("zstd(3)".into())
/// (writing) Sets if dictionary encoding is enabled. If NULL, uses
/// default parquet writer setting
pub dictionary_enabled: Option<bool>, default = Some(true)
/// (writing) Sets best effort maximum dictionary page size, in bytes
pub dictionary_page_size_limit: usize, default = 1024 * 1024
/// (writing) Sets if statistics are enabled for any column
/// Valid values are: "none", "chunk", and "page"
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
pub statistics_enabled: Option<String>, transform = str::to_lowercase, default = Some("page".into())
/// (writing) Sets max statistics size for any column. If NULL, uses
/// default parquet writer setting
pub max_statistics_size: Option<usize>, default = Some(4096)
/// (writing) Target maximum number of rows in each row group (defaults to 1M
/// rows). Writing larger row groups requires more memory to write, but
/// can get better compression and be faster to read.
pub max_row_group_size: usize, default = 1024 * 1024
/// (writing) Sets "created by" property
pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()
/// (writing) Sets column index truncate length
pub column_index_truncate_length: Option<usize>, default = Some(64)
/// (writing) Sets best effort maximum number of rows in data page
pub data_page_row_count_limit: usize, default = 20_000
/// (writing) Sets default encoding for any column.
/// Valid values are: plain, plain_dictionary, rle,
/// bit_packed, delta_binary_packed, delta_length_byte_array,
/// delta_byte_array, rle_dictionary, and byte_stream_split.
/// These values are not case sensitive. If NULL, uses
/// default parquet writer setting
pub encoding: Option<String>, transform = str::to_lowercase, default = None
/// (writing) Use any available bloom filters when reading parquet files
pub bloom_filter_on_read: bool, default = true
/// (writing) Write bloom filters for all columns when creating parquet files
pub bloom_filter_on_write: bool, default = false
/// (writing) Sets bloom filter false positive probability. If NULL, uses
/// default parquet writer setting
pub bloom_filter_fpp: Option<f64>, default = None
/// (writing) Sets bloom filter number of distinct values. If NULL, uses
/// default parquet writer setting
pub bloom_filter_ndv: Option<u64>, default = None
/// (writing) Controls whether DataFusion will attempt to speed up writing
/// parquet files by serializing them in parallel. Each column
/// in each row group in each output file are serialized in parallel
/// leveraging a maximum possible core count of n_files*n_row_groups*n_columns.
pub allow_single_file_parallelism: bool, default = true
/// (writing) By default parallel parquet writer is tuned for minimum
/// memory usage in a streaming execution plan. You may see
/// a performance benefit when writing large parquet files
/// by increasing maximum_parallel_row_group_writers and
/// maximum_buffered_record_batches_per_stream if your system
/// has idle cores and can tolerate additional memory usage.
/// Boosting these values is likely worthwhile when
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_parallel_row_group_writers: usize, default = 1
/// (writing) By default parallel parquet writer is tuned for minimum
/// memory usage in a streaming execution plan. You may see
/// a performance benefit when writing large parquet files
/// by increasing maximum_parallel_row_group_writers and
/// maximum_buffered_record_batches_per_stream if your system
/// has idle cores and can tolerate additional memory usage.
/// Boosting these values is likely worthwhile when
/// writing out already in-memory data, such as from a cached
/// data frame.
pub maximum_buffered_record_batches_per_stream: usize, default = 2
}
}
config_namespace! {
/// Options related to query optimization
///
/// See also: [`SessionConfig`]
///
/// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
pub struct OptimizerOptions {
/// When set to true, the optimizer will push a limit operation into
/// grouped aggregations which have no aggregate expressions, as a soft limit,
/// emitting groups once the limit is reached, before all rows in the group are read.
pub enable_distinct_aggregation_soft_limit: bool, default = true
/// When set to true, the physical plan optimizer will try to add round robin
/// repartitioning to increase parallelism to leverage more CPU cores
pub enable_round_robin_repartition: bool, default = true
/// When set to true, the optimizer will attempt to perform limit operations
/// during aggregations, if possible
pub enable_topk_aggregation: bool, default = true
/// When set to true, the optimizer will insert filters before a join between
/// a nullable and non-nullable column to filter out nulls on the nullable side. This
/// filter can add additional overhead when the file format does not fully support
/// predicate push down.
pub filter_null_join_keys: bool, default = false
/// Should DataFusion repartition data using the aggregate keys to execute aggregates
/// in parallel using the provided `target_partitions` level
pub repartition_aggregations: bool, default = true
/// Minimum total files size in bytes to perform file scan repartitioning.
pub repartition_file_min_size: usize, default = 10 * 1024 * 1024
/// Should DataFusion repartition data using the join keys to execute joins in parallel
/// using the provided `target_partitions` level
pub repartition_joins: bool, default = true
/// Should DataFusion allow symmetric hash joins for unbounded data sources even when
/// its inputs do not have any ordering or filtering If the flag is not enabled,
/// the SymmetricHashJoin operator will be unable to prune its internal buffers,
/// resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right,
/// RightAnti, and RightSemi - being produced only at the end of the execution.
/// This is not typical in stream processing. Additionally, without proper design for
/// long runner execution, all types of joins may encounter out-of-memory errors.
pub allow_symmetric_joins_without_pruning: bool, default = true
/// When set to `true`, file groups will be repartitioned to achieve maximum parallelism.
/// Currently Parquet and CSV formats are supported.
///
/// If set to `true`, all files will be repartitioned evenly (i.e., a single large file
/// might be partitioned into smaller chunks) for parallel scanning.
/// If set to `false`, different files will be read in parallel, but repartitioning won't
/// happen within a single file.
pub repartition_file_scans: bool, default = true
/// Should DataFusion repartition data using the partitions keys to execute window
/// functions in parallel using the provided `target_partitions` level
pub repartition_windows: bool, default = true
/// Should DataFusion execute sorts in a per-partition fashion and merge
/// afterwards instead of coalescing first and sorting globally.
/// With this flag is enabled, plans in the form below
///
/// ```text
/// "SortExec: [a@0 ASC]",
/// " CoalescePartitionsExec",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
/// would turn into the plan below which performs better in multithreaded environments
///
/// ```text
/// "SortPreservingMergeExec: [a@0 ASC]",
/// " SortExec: [a@0 ASC]",
/// " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
/// ```
pub repartition_sorts: bool, default = true
/// When true, DataFusion will opportunistically remove sorts when the data is already sorted,
/// (i.e. setting `preserve_order` to true on `RepartitionExec` and
/// using `SortPreservingMergeExec`)
///
/// When false, DataFusion will maximize plan parallelism using
/// `RepartitionExec` even if this requires subsequently resorting data using a `SortExec`.
pub prefer_existing_sort: bool, default = false
/// When set to true, the logical plan optimizer will produce warning
/// messages if any optimization rules produce errors and then proceed to the next
/// rule. When set to false, any rules that produce errors will cause the query to fail
pub skip_failed_rules: bool, default = false
/// Number of times that the optimizer will attempt to optimize the plan
pub max_passes: usize, default = 3
/// When set to true, the physical plan optimizer will run a top down
/// process to reorder the join keys
pub top_down_join_key_reordering: bool, default = true
/// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
/// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
pub prefer_hash_join: bool, default = true
/// The maximum estimated size in bytes for one input side of a HashJoin
/// will be collected into a single partition
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
/// The maximum estimated size in rows for one input side of a HashJoin
/// will be collected into a single partition
pub hash_join_single_partition_threshold_rows: usize, default = 1024 * 128
/// The default filter selectivity used by Filter Statistics
/// when an exact selectivity cannot be determined. Valid values are
/// between 0 (no selectivity) and 100 (all rows are selected).
pub default_filter_selectivity: u8, default = 20
/// When set to true, the optimizer will not attempt to convert Union to Interleave
pub prefer_existing_union: bool, default = false
/// When set to true, if the returned type is a view type
/// then the output will be coerced to a non-view.
/// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
pub expand_views_at_output: bool, default = false
}
}
config_namespace! {
/// Options controlling explain output
///
/// See also: [`SessionConfig`]
///
/// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
pub struct ExplainOptions {
/// When set to true, the explain statement will only print logical plans
pub logical_plan_only: bool, default = false
/// When set to true, the explain statement will only print physical plans
pub physical_plan_only: bool, default = false
/// When set to true, the explain statement will print operator statistics
/// for physical plans
pub show_statistics: bool, default = false
/// When set to true, the explain statement will print the partition sizes
pub show_sizes: bool, default = true
/// When set to true, the explain statement will print schema information
pub show_schema: bool, default = false
}
}
/// A key value pair, with a corresponding description
#[derive(Debug)]
pub struct ConfigEntry {
/// A unique string to identify this config value
pub key: String,
/// The value if any
pub value: Option<String>,
/// A description of this configuration entry
pub description: &'static str,
}
/// Configuration options struct, able to store both built-in configuration and custom options
#[derive(Debug, Clone, Default)]
#[non_exhaustive]
pub struct ConfigOptions {
/// Catalog options
pub catalog: CatalogOptions,
/// Execution options
pub execution: ExecutionOptions,
/// Optimizer options
pub optimizer: OptimizerOptions,
/// SQL parser options
pub sql_parser: SqlParserOptions,
/// Explain options
pub explain: ExplainOptions,
/// Optional extensions registered using [`Extensions::insert`]
pub extensions: Extensions,
}
impl ConfigField for ConfigOptions {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
// Extensions are handled in the public `ConfigOptions::set`
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"catalog" => self.catalog.set(rem, value),
"execution" => self.execution.set(rem, value),
"optimizer" => self.optimizer.set(rem, value),
"explain" => self.explain.set(rem, value),
"sql_parser" => self.sql_parser.set(rem, value),
_ => _config_err!("Config value \"{key}\" not found on ConfigOptions"),
}
}
fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
self.catalog.visit(v, "datafusion.catalog", "");
self.execution.visit(v, "datafusion.execution", "");
self.optimizer.visit(v, "datafusion.optimizer", "");
self.explain.visit(v, "datafusion.explain", "");
self.sql_parser.visit(v, "datafusion.sql_parser", "");
}
}
impl ConfigOptions {
/// Creates a new [`ConfigOptions`] with default values
pub fn new() -> Self {
Self::default()
}
/// Set extensions to provided value
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = extensions;
self
}
/// Set a configuration option
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
let Some((prefix, key)) = key.split_once('.') else {
return _config_err!("could not find config namespace for key \"{key}\"");
};
if prefix == "datafusion" {
return ConfigField::set(self, key, value);
}
let Some(e) = self.extensions.0.get_mut(prefix) else {
return _config_err!("Could not find config namespace \"{prefix}\"");
};
e.0.set(key, value)
}
/// Create new ConfigOptions struct, taking values from
/// environment variables where possible.
///
/// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
/// control `datafusion.execution.batch_size`.
pub fn from_env() -> Result<Self> {
struct Visitor(Vec<String>);
impl Visit for Visitor {
fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
self.0.push(key.to_string())
}
fn none(&mut self, key: &str, _: &'static str) {
self.0.push(key.to_string())
}
}
// Extract the names of all fields and then look up the corresponding
// environment variables. This isn't hugely efficient but avoids
// ambiguity between `a.b` and `a_b` which would both correspond
// to an environment variable of `A_B`
let mut keys = Visitor(vec![]);
let mut ret = Self::default();
ret.visit(&mut keys, "datafusion", "");
for key in keys.0 {
let env = key.to_uppercase().replace('.', "_");
if let Some(var) = std::env::var_os(env) {
ret.set(&key, var.to_string_lossy().as_ref())?;
}
}
Ok(ret)
}
/// Create new ConfigOptions struct, taking values from a string hash map.
///
/// Only the built-in configurations will be extracted from the hash map
/// and other key value pairs will be ignored.
pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
struct Visitor(Vec<String>);
impl Visit for Visitor {
fn some<V: Display>(&mut self, key: &str, _: V, _: &'static str) {
self.0.push(key.to_string())
}
fn none(&mut self, key: &str, _: &'static str) {
self.0.push(key.to_string())
}
}
let mut keys = Visitor(vec![]);
let mut ret = Self::default();
ret.visit(&mut keys, "datafusion", "");
for key in keys.0 {
if let Some(var) = settings.get(&key) {
ret.set(&key, var)?;
}
}
Ok(ret)
}
/// Returns the [`ConfigEntry`] stored within this [`ConfigOptions`]
pub fn entries(&self) -> Vec<ConfigEntry> {
struct Visitor(Vec<ConfigEntry>);
impl Visit for Visitor {
fn some<V: Display>(
&mut self,
key: &str,
value: V,
description: &'static str,
) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: Some(value.to_string()),
description,
})
}
fn none(&mut self, key: &str, description: &'static str) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: None,
description,
})
}
}
let mut v = Visitor(vec![]);
self.visit(&mut v, "datafusion", "");
v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
v.0
}
/// Generate documentation that can be included in the user guide
pub fn generate_config_markdown() -> String {
use std::fmt::Write as _;
let mut s = Self::default();
// Normalize for display
s.execution.target_partitions = 0;
s.execution.planning_concurrency = 0;
let mut docs = "| key | default | description |\n".to_string();
docs += "|-----|---------|-------------|\n";
let mut entries = s.entries();
entries.sort_unstable_by(|a, b| a.key.cmp(&b.key));
for entry in s.entries() {
let _ = writeln!(
&mut docs,
"| {} | {} | {} |",
entry.key,
entry.value.as_deref().unwrap_or("NULL"),
entry.description
);
}
docs
}
}
/// [`ConfigExtension`] provides a mechanism to store third-party configuration within DataFusion
///
/// Unfortunately associated constants are not currently object-safe, and so this
/// extends the object-safe [`ExtensionOptions`]
pub trait ConfigExtension: ExtensionOptions {
/// Configuration namespace prefix to use
///
/// All values under this will be prefixed with `$PREFIX + "."`
const PREFIX: &'static str;
}
/// An object-safe API for storing arbitrary configuration
pub trait ExtensionOptions: Send + Sync + fmt::Debug + 'static {
/// Return `self` as [`Any`]
///
/// This is needed until trait upcasting is stabilized
fn as_any(&self) -> &dyn Any;
/// Return `self` as [`Any`]
///
/// This is needed until trait upcasting is stabilized
fn as_any_mut(&mut self) -> &mut dyn Any;
/// Return a deep clone of this [`ExtensionOptions`]
///
/// It is important this does not share mutable state to avoid consistency issues
/// with configuration changing whilst queries are executing
fn cloned(&self) -> Box<dyn ExtensionOptions>;
/// Set the given `key`, `value` pair
fn set(&mut self, key: &str, value: &str) -> Result<()>;
/// Returns the [`ConfigEntry`] stored in this [`ExtensionOptions`]
fn entries(&self) -> Vec<ConfigEntry>;
}
/// A type-safe container for [`ConfigExtension`]
#[derive(Debug, Default, Clone)]
pub struct Extensions(BTreeMap<&'static str, ExtensionBox>);
impl Extensions {
/// Create a new, empty [`Extensions`]
pub fn new() -> Self {
Self(BTreeMap::new())
}
/// Registers a [`ConfigExtension`] with this [`ConfigOptions`]
pub fn insert<T: ConfigExtension>(&mut self, extension: T) {
assert_ne!(T::PREFIX, "datafusion");
let e = ExtensionBox(Box::new(extension));
self.0.insert(T::PREFIX, e);
}
/// Retrieves the extension of the given type if any
pub fn get<T: ConfigExtension>(&self) -> Option<&T> {
self.0.get(T::PREFIX)?.0.as_any().downcast_ref()
}
/// Retrieves the extension of the given type if any
pub fn get_mut<T: ConfigExtension>(&mut self) -> Option<&mut T> {
let e = self.0.get_mut(T::PREFIX)?;
e.0.as_any_mut().downcast_mut()
}
}
#[derive(Debug)]
struct ExtensionBox(Box<dyn ExtensionOptions>);
impl Clone for ExtensionBox {
fn clone(&self) -> Self {
Self(self.0.cloned())
}
}
/// A trait implemented by `config_namespace` and for field types that provides
/// the ability to walk and mutate the configuration tree
pub trait ConfigField {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str);
fn set(&mut self, key: &str, value: &str) -> Result<()>;
}
impl<F: ConfigField + Default> ConfigField for Option<F> {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
match self {
Some(s) => s.visit(v, key, description),
None => v.none(key, description),
}
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
self.get_or_insert_with(Default::default).set(key, value)
}
}
fn default_transform<T>(input: &str) -> Result<T>
where
T: FromStr,
<T as FromStr>::Err: Sync + Send + Error + 'static,
{
input.parse().map_err(|e| {
DataFusionError::Context(
format!(
"Error parsing '{}' as {}",
input,
std::any::type_name::<T>()
),
Box::new(DataFusionError::External(Box::new(e))),
)
})
}
#[macro_export]
macro_rules! config_field {
($t:ty) => {
config_field!($t, value => default_transform(value)?);
};
($t:ty, $arg:ident => $transform:expr) => {
impl ConfigField for $t {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}
fn set(&mut self, _: &str, $arg: &str) -> Result<()> {
*self = $transform;
Ok(())
}
}
};
}
config_field!(String);
config_field!(bool, value => default_transform(value.to_lowercase().as_str())?);
config_field!(usize);
config_field!(f64);
config_field!(u64);
impl ConfigField for u8 {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
if value.is_empty() {
return Err(DataFusionError::Configuration(format!(
"Input string for {} key is empty",
key
)));
}
// Check if the string is a valid number
if let Ok(num) = value.parse::<u8>() {
// TODO: Let's decide how we treat the numerical strings.
*self = num;
} else {
let bytes = value.as_bytes();
// Check if the first character is ASCII (single byte)
if bytes.len() > 1 || !value.chars().next().unwrap().is_ascii() {
return Err(DataFusionError::Configuration(format!(
"Error parsing {} as u8. Non-ASCII string provided",
value
)));
}
*self = bytes[0];
}
Ok(())
}
}
impl ConfigField for CompressionTypeVariant {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
v.some(key, self, description)
}
fn set(&mut self, _: &str, value: &str) -> Result<()> {
*self = CompressionTypeVariant::from_str(value)?;
Ok(())
}
}
/// An implementation trait used to recursively walk configuration
pub trait Visit {
fn some<V: Display>(&mut self, key: &str, value: V, description: &'static str);
fn none(&mut self, key: &str, description: &'static str);
}
/// Convenience macro to create [`ExtensionsOptions`].
///
/// The created structure implements the following traits:
///
/// - [`Clone`]
/// - [`Debug`]
/// - [`Default`]
/// - [`ExtensionOptions`]
///
/// # Usage
/// The syntax is:
///
/// ```text
/// extensions_options! {
/// /// Struct docs (optional).
/// [<vis>] struct <StructName> {
/// /// Field docs (optional)
/// [<vis>] <field_name>: <field_type>, default = <default_value>
///
/// ... more fields
/// }
/// }
/// ```
///
/// The placeholders are:
/// - `[<vis>]`: Optional visibility modifier like `pub` or `pub(crate)`.
/// - `<StructName>`: Struct name like `MyStruct`.
/// - `<field_name>`: Field name like `my_field`.
/// - `<field_type>`: Field type like `u8`.
/// - `<default_value>`: Default value matching the field type like `42`.
///
/// # Example
/// ```
/// use datafusion_common::extensions_options;
///
/// extensions_options! {
/// /// My own config options.
/// pub struct MyConfig {
/// /// Should "foo" be replaced by "bar"?
/// pub foo_to_bar: bool, default = true
///
/// /// How many "baz" should be created?
/// pub baz_count: usize, default = 1337
/// }
/// }
/// ```
///
///
/// [`Debug`]: std::fmt::Debug
/// [`ExtensionsOptions`]: crate::config::ExtensionOptions
#[macro_export]
macro_rules! extensions_options {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])*
#[derive(Debug, Clone)]
#[non_exhaustive]
$vis struct $struct_name{
$(
$(#[doc = $d])*
$field_vis $field_name : $field_type,
)*
}
impl Default for $struct_name {
fn default() -> Self {
Self {
$($field_name: $default),*
}
}
}
impl $crate::config::ExtensionOptions for $struct_name {
fn as_any(&self) -> &dyn ::std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn ::std::any::Any {
self
}
fn cloned(&self) -> Box<dyn $crate::config::ExtensionOptions> {
Box::new(self.clone())
}
fn set(&mut self, key: &str, value: &str) -> $crate::Result<()> {
match key {
$(
stringify!($field_name) => {
self.$field_name = value.parse().map_err(|e| {
$crate::DataFusionError::Context(
format!(concat!("Error parsing {} as ", stringify!($t),), value),
Box::new($crate::DataFusionError::External(Box::new(e))),
)
})?;
Ok(())
}
)*
_ => Err($crate::DataFusionError::Configuration(
format!(concat!("Config value \"{}\" not found on ", stringify!($struct_name)), key)
))
}
}
fn entries(&self) -> Vec<$crate::config::ConfigEntry> {
vec![
$(
$crate::config::ConfigEntry {
key: stringify!($field_name).to_owned(),
value: (self.$field_name != $default).then(|| self.$field_name.to_string()),
description: concat!($($d),*).trim(),
},
)*
]
}
}
}
}
/// These file types have special built in behavior for configuration.
/// Use TableOptions::Extensions for configuring other file types.
#[derive(Debug, Clone)]
pub enum ConfigFileType {
CSV,
#[cfg(feature = "parquet")]
PARQUET,
JSON,
}
/// Represents the configuration options available for handling different table formats within a data processing application.
/// This struct encompasses options for various file formats including CSV, Parquet, and JSON, allowing for flexible configuration
/// of parsing and writing behaviors specific to each format. Additionally, it supports extending functionality through custom extensions.
#[derive(Debug, Clone, Default)]
pub struct TableOptions {
/// Configuration options for CSV file handling. This includes settings like the delimiter,
/// quote character, and whether the first row is considered as headers.
pub csv: CsvOptions,
/// Configuration options for Parquet file handling. This includes settings for compression,
/// encoding, and other Parquet-specific file characteristics.
pub parquet: TableParquetOptions,
/// Configuration options for JSON file handling.
pub json: JsonOptions,
/// The current file format that the table operations should assume. This option allows
/// for dynamic switching between the supported file types (e.g., CSV, Parquet, JSON).
pub current_format: Option<ConfigFileType>,
/// Optional extensions that can be used to extend or customize the behavior of the table
/// options. Extensions can be registered using `Extensions::insert` and might include
/// custom file handling logic, additional configuration parameters, or other enhancements.
pub extensions: Extensions,
}
impl ConfigField for TableOptions {
/// Visits configuration settings for the current file format, or all formats if none is selected.
///
/// This method adapts the behavior based on whether a file format is currently selected in `current_format`.
/// If a format is selected, it visits only the settings relevant to that format. Otherwise,
/// it visits all available format settings.
fn visit<V: Visit>(&self, v: &mut V, _key_prefix: &str, _description: &'static str) {
if let Some(file_type) = &self.current_format {
match file_type {
#[cfg(feature = "parquet")]
ConfigFileType::PARQUET => self.parquet.visit(v, "format", ""),
ConfigFileType::CSV => self.csv.visit(v, "format", ""),
ConfigFileType::JSON => self.json.visit(v, "format", ""),
}
} else {
self.csv.visit(v, "csv", "");
self.parquet.visit(v, "parquet", "");
self.json.visit(v, "json", "");
}
}
/// Sets a configuration value for a specific key within `TableOptions`.
///
/// This method delegates setting configuration values to the specific file format configurations,
/// based on the current format selected. If no format is selected, it returns an error.
///
/// # Parameters
///
/// * `key`: The configuration key specifying which setting to adjust, prefixed with the format (e.g., "format.delimiter")
/// for CSV format.
/// * `value`: The value to set for the specified configuration key.
///
/// # Returns
///
/// A result indicating success or an error if the key is not recognized, if a format is not specified,
/// or if setting the configuration value fails for the specific format.
fn set(&mut self, key: &str, value: &str) -> Result<()> {
// Extensions are handled in the public `ConfigOptions::set`
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
"format" => {
let Some(format) = &self.current_format else {
return _config_err!("Specify a format for TableOptions");
};
match format {
#[cfg(feature = "parquet")]
ConfigFileType::PARQUET => self.parquet.set(rem, value),
ConfigFileType::CSV => self.csv.set(rem, value),
ConfigFileType::JSON => self.json.set(rem, value),
}
}
_ => _config_err!("Config value \"{key}\" not found on TableOptions"),
}
}
}
impl TableOptions {
/// Constructs a new instance of `TableOptions` with default settings.
///
/// # Returns
///
/// A new `TableOptions` instance with default configuration values.
pub fn new() -> Self {
Self::default()
}
/// Creates a new `TableOptions` instance initialized with settings from a given session config.
///
/// # Parameters
///
/// * `config`: A reference to the session `ConfigOptions` from which to derive initial settings.
///
/// # Returns
///
/// A new `TableOptions` instance with settings applied from the session config.
pub fn default_from_session_config(config: &ConfigOptions) -> Self {
let initial = TableOptions::default();
initial.combine_with_session_config(config);
initial
}
/// Updates the current `TableOptions` with settings from a given session config.
///
/// # Parameters
///
/// * `config`: A reference to the session `ConfigOptions` whose settings are to be applied.
///
/// # Returns
///
/// A new `TableOptions` instance with updated settings from the session config.
pub fn combine_with_session_config(&self, config: &ConfigOptions) -> Self {
let mut clone = self.clone();
clone.parquet.global = config.execution.parquet.clone();
clone
}
/// Sets the file format for the table.
///
/// # Parameters
///
/// * `format`: The file format to use (e.g., CSV, Parquet).
pub fn set_config_format(&mut self, format: ConfigFileType) {
self.current_format = Some(format);
}
/// Sets the extensions for this `TableOptions` instance.
///
/// # Parameters
///
/// * `extensions`: The `Extensions` instance to set.
///
/// # Returns
///
/// A new `TableOptions` instance with the specified extensions applied.
pub fn with_extensions(mut self, extensions: Extensions) -> Self {
self.extensions = extensions;
self
}
/// Sets a specific configuration option.
///
/// # Parameters
///
/// * `key`: The configuration key (e.g., "format.delimiter").
/// * `value`: The value to set for the specified key.
///
/// # Returns
///
/// A result indicating success or failure in setting the configuration option.
pub fn set(&mut self, key: &str, value: &str) -> Result<()> {
let Some((prefix, _)) = key.split_once('.') else {
return _config_err!("could not find config namespace for key \"{key}\"");
};
if prefix == "format" {
return ConfigField::set(self, key, value);
}
if prefix == "execution" {
return Ok(());
}
let Some(e) = self.extensions.0.get_mut(prefix) else {
return _config_err!("Could not find config namespace \"{prefix}\"");
};
e.0.set(key, value)
}
/// Initializes a new `TableOptions` from a hash map of string settings.
///
/// # Parameters
///
/// * `settings`: A hash map where each key-value pair represents a configuration setting.
///
/// # Returns
///
/// A result containing the new `TableOptions` instance or an error if any setting could not be applied.
pub fn from_string_hash_map(settings: &HashMap<String, String>) -> Result<Self> {
let mut ret = Self::default();
for (k, v) in settings {
ret.set(k, v)?;
}
Ok(ret)
}
/// Modifies the current `TableOptions` instance with settings from a hash map.
///
/// # Parameters
///
/// * `settings`: A hash map where each key-value pair represents a configuration setting.
///
/// # Returns
///
/// A result indicating success or failure in applying the settings.
pub fn alter_with_string_hash_map(
&mut self,
settings: &HashMap<String, String>,
) -> Result<()> {
for (k, v) in settings {
self.set(k, v)?;
}
Ok(())
}
/// Retrieves all configuration entries from this `TableOptions`.
///
/// # Returns
///
/// A vector of `ConfigEntry` instances, representing all the configuration options within this `TableOptions`.
pub fn entries(&self) -> Vec<ConfigEntry> {
struct Visitor(Vec<ConfigEntry>);
impl Visit for Visitor {
fn some<V: Display>(
&mut self,
key: &str,
value: V,
description: &'static str,
) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: Some(value.to_string()),
description,
})
}
fn none(&mut self, key: &str, description: &'static str) {
self.0.push(ConfigEntry {
key: key.to_string(),
value: None,
description,
})
}
}
let mut v = Visitor(vec![]);
self.visit(&mut v, "format", "");
v.0.extend(self.extensions.0.values().flat_map(|e| e.0.entries()));
v.0
}
}
/// Options that control how Parquet files are read, including global options
/// that apply to all columns and optional column-specific overrides
///
/// Closely tied to [`ParquetWriterOptions`](crate::file_options::parquet_writer::ParquetWriterOptions).
/// Properties not included in [`TableParquetOptions`] may not be configurable at the external API
/// (e.g. sorting_columns).
#[derive(Clone, Default, Debug, PartialEq)]
pub struct TableParquetOptions {
/// Global Parquet options that propagates to all columns.
pub global: ParquetOptions,
/// Column specific options. Default usage is parquet.XX::column.
pub column_specific_options: HashMap<String, ParquetColumnOptions>,
/// Additional file-level metadata to include. Inserted into the key_value_metadata
/// for the written [`FileMetaData`](https://docs.rs/parquet/latest/parquet/file/metadata/struct.FileMetaData.html).
///
/// Multiple entries are permitted
/// ```sql
/// OPTIONS (
/// 'format.metadata::key1' '',
/// 'format.metadata::key2' 'value',
/// 'format.metadata::key3' 'value has spaces',
/// 'format.metadata::key4' 'value has special chars :: :',
/// 'format.metadata::key_dupe' 'original will be overwritten',
/// 'format.metadata::key_dupe' 'final'
/// )
/// ```
pub key_value_metadata: HashMap<String, Option<String>>,
}
impl TableParquetOptions {
/// Return new default TableParquetOptions
pub fn new() -> Self {
Self::default()
}
/// Set whether the encoding of the arrow metadata should occur
/// during the writing of parquet.
///
/// Default is to encode the arrow schema in the file kv_metadata.
pub fn with_skip_arrow_metadata(self, skip: bool) -> Self {
Self {
global: ParquetOptions {
skip_arrow_metadata: skip,
..self.global
},
..self
}
}
}
impl ConfigField for TableParquetOptions {
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, description: &'static str) {
self.global.visit(v, key_prefix, description);
self.column_specific_options
.visit(v, key_prefix, description)
}
fn set(&mut self, key: &str, value: &str) -> Result<()> {
// Determine if the key is a global, metadata, or column-specific setting
if key.starts_with("metadata::") {
let k = match key.split("::").collect::<Vec<_>>()[..] {
[_meta] | [_meta, ""] => {
return _config_err!(
"Invalid metadata key provided, missing key in metadata::<key>"
)
}
[_meta, k] => k.into(),
_ => {
return _config_err!(
"Invalid metadata key provided, found too many '::' in \"{key}\""
)
}
};
self.key_value_metadata.insert(k, Some(value.into()));
Ok(())
} else if key.contains("::") {
self.column_specific_options.set(key, value)
} else {
self.global.set(key, value)
}
}
}
macro_rules! config_namespace_with_hashmap {
(
$(#[doc = $struct_d:tt])*
$vis:vis struct $struct_name:ident {
$(
$(#[doc = $d:tt])*
$field_vis:vis $field_name:ident : $field_type:ty, $(transform = $transform:expr,)? default = $default:expr
)*$(,)*
}
) => {
$(#[doc = $struct_d])*
#[derive(Debug, Clone, PartialEq)]
$vis struct $struct_name{
$(
$(#[doc = $d])*
$field_vis $field_name : $field_type,
)*
}
impl ConfigField for $struct_name {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
match key {
$(
stringify!($field_name) => {
$(let value = $transform(value);)?
self.$field_name.set(rem, value.as_ref())
},
)*
_ => _config_err!(
"Config value \"{}\" not found on {}", key, stringify!($struct_name)
)
}
}
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
$(
let key = format!(concat!("{}.", stringify!($field_name)), key_prefix);
let desc = concat!($($d),*).trim();
self.$field_name.visit(v, key.as_str(), desc);
)*
}
}
impl Default for $struct_name {
fn default() -> Self {
Self {
$($field_name: $default),*
}
}
}
impl ConfigField for HashMap<String,$struct_name> {
fn set(&mut self, key: &str, value: &str) -> Result<()> {
let parts: Vec<&str> = key.splitn(2, "::").collect();
match parts.as_slice() {
[inner_key, hashmap_key] => {
// Get or create the ColumnOptions for the specified column
let inner_value = self
.entry((*hashmap_key).to_owned())
.or_insert_with($struct_name::default);
inner_value.set(inner_key, value)
}
_ => _config_err!("Unrecognized key '{key}'."),
}
}
fn visit<V: Visit>(&self, v: &mut V, key_prefix: &str, _description: &'static str) {
for (column_name, col_options) in self {
$(
let key = format!("{}.{field}::{}", key_prefix, column_name, field = stringify!($field_name));
let desc = concat!($($d),*).trim();
col_options.$field_name.visit(v, key.as_str(), desc);
)*
}
}
}
}
}
config_namespace_with_hashmap! {
/// Options controlling parquet format for individual columns.
///
/// See [`ParquetOptions`] for more details
pub struct ParquetColumnOptions {
/// Sets if bloom filter is enabled for the column path.
pub bloom_filter_enabled: Option<bool>, default = None
/// Sets encoding for the column path.
/// Valid values are: plain, plain_dictionary, rle,
/// bit_packed, delta_binary_packed, delta_length_byte_array,
/// delta_byte_array, rle_dictionary, and byte_stream_split.
/// These values are not case-sensitive. If NULL, uses
/// default parquet options
pub encoding: Option<String>, default = None
/// Sets if dictionary encoding is enabled for the column path. If NULL, uses
/// default parquet options
pub dictionary_enabled: Option<bool>, default = None
/// Sets default parquet compression codec for the column path.
/// Valid values are: uncompressed, snappy, gzip(level),
/// lzo, brotli(level), lz4, zstd(level), and lz4_raw.
/// These values are not case-sensitive. If NULL, uses
/// default parquet options
pub compression: Option<String>, transform = str::to_lowercase, default = None
/// Sets if statistics are enabled for the column
/// Valid values are: "none", "chunk", and "page"
/// These values are not case sensitive. If NULL, uses
/// default parquet options
pub statistics_enabled: Option<String>, default = None
/// Sets bloom filter false positive probability for the column path. If NULL, uses
/// default parquet options
pub bloom_filter_fpp: Option<f64>, default = None
/// Sets bloom filter number of distinct values. If NULL, uses
/// default parquet options
pub bloom_filter_ndv: Option<u64>, default = None
/// Sets max statistics size for the column path. If NULL, uses
/// default parquet options
pub max_statistics_size: Option<usize>, default = None
}
}
config_namespace! {
/// Options controlling CSV format
pub struct CsvOptions {
/// Specifies whether there is a CSV header (i.e. the first line
/// consists of is column names). The value `None` indicates that
/// the configuration should be consulted.
pub has_header: Option<bool>, default = None
pub delimiter: u8, default = b','
pub quote: u8, default = b'"'
pub terminator: Option<u8>, default = None
pub escape: Option<u8>, default = None
pub double_quote: Option<bool>, default = None
/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub newlines_in_values: Option<bool>, default = None
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: Option<usize>, default = None
pub date_format: Option<String>, default = None
pub datetime_format: Option<String>, default = None
pub timestamp_format: Option<String>, default = None
pub timestamp_tz_format: Option<String>, default = None
pub time_format: Option<String>, default = None
// The output format for Nulls in the CSV writer.
pub null_value: Option<String>, default = None
// The input regex for Nulls when loading CSVs.
pub null_regex: Option<String>, default = None
pub comment: Option<u8>, default = None
}
}
impl CsvOptions {
/// Set a limit in terms of records to scan to infer the schema
/// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_compression(
mut self,
compression_type_variant: CompressionTypeVariant,
) -> Self {
self.compression = compression_type_variant;
self
}
/// Set a limit in terms of records to scan to infer the schema
/// - default to `DEFAULT_SCHEMA_INFER_MAX_RECORD`
pub fn with_schema_infer_max_rec(mut self, max_rec: usize) -> Self {
self.schema_infer_max_rec = Some(max_rec);
self
}
/// Set true to indicate that the first line is a header.
/// - default to true
pub fn with_has_header(mut self, has_header: bool) -> Self {
self.has_header = Some(has_header);
self
}
/// Returns true if the first line is a header. If format options does not
/// specify whether there is a header, returns `None` (indicating that the
/// configuration should be consulted).
pub fn has_header(&self) -> Option<bool> {
self.has_header
}
/// The character separating values within a row.
/// - default to ','
pub fn with_delimiter(mut self, delimiter: u8) -> Self {
self.delimiter = delimiter;
self
}
/// The quote character in a row.
/// - default to '"'
pub fn with_quote(mut self, quote: u8) -> Self {
self.quote = quote;
self
}
/// The character that terminates a row.
/// - default to None (CRLF)
pub fn with_terminator(mut self, terminator: Option<u8>) -> Self {
self.terminator = terminator;
self
}
/// The escape character in a row.
/// - default is None
pub fn with_escape(mut self, escape: Option<u8>) -> Self {
self.escape = escape;
self
}
/// Set true to indicate that the CSV quotes should be doubled.
/// - default to true
pub fn with_double_quote(mut self, double_quote: bool) -> Self {
self.double_quote = Some(double_quote);
self
}
/// Specifies whether newlines in (quoted) values are supported.
///
/// Parsing newlines in quoted values may be affected by execution behaviour such as
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
/// parsed successfully, which may reduce performance.
///
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
self.newlines_in_values = Some(newlines_in_values);
self
}
/// Set a `CompressionTypeVariant` of CSV
/// - defaults to `CompressionTypeVariant::UNCOMPRESSED`
pub fn with_file_compression_type(
mut self,
compression: CompressionTypeVariant,
) -> Self {
self.compression = compression;
self
}
/// The delimiter character.
pub fn delimiter(&self) -> u8 {
self.delimiter
}
/// The quote character.
pub fn quote(&self) -> u8 {
self.quote
}
/// The terminator character.
pub fn terminator(&self) -> Option<u8> {
self.terminator
}
/// The escape character.
pub fn escape(&self) -> Option<u8> {
self.escape
}
}
config_namespace! {
/// Options controlling JSON format
pub struct JsonOptions {
pub compression: CompressionTypeVariant, default = CompressionTypeVariant::UNCOMPRESSED
pub schema_infer_max_rec: Option<usize>, default = None
}
}
pub trait FormatOptionsExt: Display {}
#[derive(Debug, Clone, PartialEq)]
#[allow(clippy::large_enum_variant)]
pub enum FormatOptions {
CSV(CsvOptions),
JSON(JsonOptions),
#[cfg(feature = "parquet")]
PARQUET(TableParquetOptions),
AVRO,
ARROW,
}
impl Display for FormatOptions {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let out = match self {
FormatOptions::CSV(_) => "csv",
FormatOptions::JSON(_) => "json",
#[cfg(feature = "parquet")]
FormatOptions::PARQUET(_) => "parquet",
FormatOptions::AVRO => "avro",
FormatOptions::ARROW => "arrow",
};
write!(f, "{}", out)
}
}
#[cfg(test)]
mod tests {
use std::any::Any;
use std::collections::HashMap;
use crate::config::{
ConfigEntry, ConfigExtension, ConfigFileType, ExtensionOptions, Extensions,
TableOptions,
};
#[derive(Default, Debug, Clone)]
pub struct TestExtensionConfig {
/// Should "foo" be replaced by "bar"?
pub properties: HashMap<String, String>,
}
impl ExtensionOptions for TestExtensionConfig {
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn cloned(&self) -> Box<dyn ExtensionOptions> {
Box::new(self.clone())
}
fn set(&mut self, key: &str, value: &str) -> crate::Result<()> {
let (key, rem) = key.split_once('.').unwrap_or((key, ""));
assert_eq!(key, "test");
self.properties.insert(rem.to_owned(), value.to_owned());
Ok(())
}
fn entries(&self) -> Vec<ConfigEntry> {
self.properties
.iter()
.map(|(k, v)| ConfigEntry {
key: k.into(),
value: Some(v.into()),
description: "",
})
.collect()
}
}
impl ConfigExtension for TestExtensionConfig {
const PREFIX: &'static str = "test";
}
#[test]
fn create_table_config() {
let mut extension = Extensions::new();
extension.insert(TestExtensionConfig::default());
let table_config = TableOptions::new().with_extensions(extension);
let kafka_config = table_config.extensions.get::<TestExtensionConfig>();
assert!(kafka_config.is_some())
}
#[test]
fn alter_test_extension_config() {
let mut extension = Extensions::new();
extension.insert(TestExtensionConfig::default());
let mut table_config = TableOptions::new().with_extensions(extension);
table_config.set_config_format(ConfigFileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter, b';');
table_config.set("test.bootstrap.servers", "asd").unwrap();
let kafka_config = table_config
.extensions
.get::<TestExtensionConfig>()
.unwrap();
assert_eq!(
kafka_config.properties.get("bootstrap.servers").unwrap(),
"asd"
);
}
#[test]
fn csv_u8_table_options() {
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::CSV);
table_config.set("format.delimiter", ";").unwrap();
assert_eq!(table_config.csv.delimiter as char, ';');
table_config.set("format.escape", "\"").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '"');
table_config.set("format.escape", "\'").unwrap();
assert_eq!(table_config.csv.escape.unwrap() as char, '\'');
}
#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options() {
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::PARQUET);
table_config
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
assert_eq!(
table_config.parquet.column_specific_options["col1"].bloom_filter_enabled,
Some(true)
);
}
#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options_config_entry() {
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::PARQUET);
table_config
.set("format.bloom_filter_enabled::col1", "true")
.unwrap();
let entries = table_config.entries();
assert!(entries
.iter()
.any(|item| item.key == "format.bloom_filter_enabled::col1"))
}
#[cfg(feature = "parquet")]
#[test]
fn parquet_table_options_config_metadata_entry() {
let mut table_config = TableOptions::new();
table_config.set_config_format(ConfigFileType::PARQUET);
table_config.set("format.metadata::key1", "").unwrap();
table_config.set("format.metadata::key2", "value2").unwrap();
table_config
.set("format.metadata::key3", "value with spaces ")
.unwrap();
table_config
.set("format.metadata::key4", "value with special chars :: :")
.unwrap();
let parsed_metadata = table_config.parquet.key_value_metadata.clone();
assert_eq!(parsed_metadata.get("should not exist1"), None);
assert_eq!(parsed_metadata.get("key1"), Some(&Some("".into())));
assert_eq!(parsed_metadata.get("key2"), Some(&Some("value2".into())));
assert_eq!(
parsed_metadata.get("key3"),
Some(&Some("value with spaces ".into()))
);
assert_eq!(
parsed_metadata.get("key4"),
Some(&Some("value with special chars :: :".into()))
);
// duplicate keys are overwritten
table_config.set("format.metadata::key_dupe", "A").unwrap();
table_config.set("format.metadata::key_dupe", "B").unwrap();
let parsed_metadata = table_config.parquet.key_value_metadata;
assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
}
}