blob: b5a6760cae02043078ee0309b84ae5ca18fa6bcb [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Common behaviors that every file format needs to implement
use std::any::Any;
use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
#[expect(deprecated)]
use crate::schema_adapter::SchemaAdapterFactory;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::TreeNodeRecursion;
use datafusion_common::{Result, not_impl_err};
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr};
use datafusion_physical_plan::DisplayFormatType;
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use object_store::ObjectStore;
/// Helper function to convert any type implementing [`FileSource`] to `Arc<dyn FileSource>`
pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> {
Arc::new(source)
}
/// File format specific behaviors for [`DataSource`]
///
/// # Schema information
/// There are two important schemas for a [`FileSource`]:
/// 1. [`Self::table_schema`] -- the schema for the overall table
/// (file data plus partition columns)
/// 2. The logical output schema, comprised of [`Self::table_schema`] with
/// [`Self::projection`] applied
///
/// See more details on specific implementations:
/// * [`ArrowSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ArrowSource.html)
/// * [`AvroSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.AvroSource.html)
/// * [`CsvSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.CsvSource.html)
/// * [`JsonSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.JsonSource.html)
/// * [`ParquetSource`](https://docs.rs/datafusion/latest/datafusion/datasource/physical_plan/struct.ParquetSource.html)
///
/// [`DataSource`]: crate::source::DataSource
pub trait FileSource: Send + Sync {
/// Creates a `dyn FileOpener` based on given parameters
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Result<Arc<dyn FileOpener>>;
/// Any
fn as_any(&self) -> &dyn Any;
/// Returns the table schema for the overall table (including partition columns, if any)
///
/// This method returns the unprojected schema: the full schema of the data
/// without [`Self::projection`] applied.
///
/// The output schema of this `FileSource` is this TableSchema
/// with [`Self::projection`] applied.
///
/// Use [`ProjectionExprs::project_schema`] to get the projected schema
/// after applying the projection.
fn table_schema(&self) -> &crate::table_schema::TableSchema;
/// Initialize new type with batch size configuration
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
/// Returns the filter expression that will be applied *during* the file scan.
///
/// These expressions are in terms of the unprojected [`Self::table_schema`].
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
None
}
/// Return the projection that will be applied to the output stream on top
/// of [`Self::table_schema`].
///
/// Note you can use [`ProjectionExprs::project_schema`] on the table
/// schema to get the effective output schema of this source.
fn projection(&self) -> Option<&ProjectionExprs> {
None
}
/// Return execution plan metrics
fn metrics(&self) -> &ExecutionPlanMetricsSet;
/// String representation of file source such as "csv", "json", "parquet"
fn file_type(&self) -> &str;
/// Format FileType specific information
fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
Ok(())
}
/// Returns whether this file source supports repartitioning files by byte ranges.
///
/// When this returns `true`, files can be split into multiple partitions
/// based on byte offsets for parallel reading.
///
/// When this returns `false`, files cannot be repartitioned (e.g., CSV files
/// with `newlines_in_values` enabled cannot be split because record boundaries
/// cannot be determined by byte offset alone).
///
/// The default implementation returns `true`. File sources that cannot support
/// repartitioning should override this method.
fn supports_repartitioning(&self) -> bool {
true
}
/// If supported by the [`FileSource`], redistribute files across partitions
/// according to their size. Allows custom file formats to implement their
/// own repartitioning logic.
///
/// The default implementation uses [`FileGroupPartitioner`]. See that
/// struct for more details.
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> Result<Option<FileScanConfig>> {
if config.file_compression_type.is_compressed() || !self.supports_repartitioning()
{
return Ok(None);
}
let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(output_ordering.is_some())
.repartition_file_groups(&config.file_groups);
if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut source = config.clone();
source.file_groups = repartitioned_file_groups;
return Ok(Some(source));
}
Ok(None)
}
/// Try to push down filters into this FileSource.
///
/// `filters` must be in terms of the unprojected table schema (file schema
/// plus partition columns), before any projection is applied.
///
/// Any filters that this FileSource chooses to evaluate itself should be
/// returned as `PushedDown::Yes` in the result, along with a FileSource
/// instance that incorporates those filters. Such filters are logically
/// applied "during" the file scan, meaning they may refer to columns not
/// included in the final output projection.
///
/// Filters that cannot be pushed down should be marked as `PushedDown::No`,
/// and will be evaluated by an execution plan after the file source.
///
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
///
/// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![PushedDown::No; filters.len()],
))
}
/// Try to create a new FileSource that can produce data in the specified sort order.
///
/// This method attempts to optimize data retrieval to match the requested ordering.
/// It receives both the requested ordering and equivalence properties that describe
/// the output data from this file source.
///
/// # Parameters
/// * `order` - The requested sort ordering from the query
/// * `eq_properties` - Equivalence properties of the data that will be produced by this
/// file source. These properties describe the ordering, constant columns, and other
/// relationships in the output data, allowing the implementation to determine if
/// optimizations like reversed scanning can help satisfy the requested ordering.
/// This includes information about:
/// - The file's natural ordering (from output_ordering in FileScanConfig)
/// - Constant columns (e.g., from filters like `ticker = 'AAPL'`)
/// - Monotonic functions (e.g., `extract_year_month(timestamp)`)
/// - Other equivalence relationships
///
/// # Examples
///
/// ## Example 1: Simple reverse
/// ```text
/// File ordering: [a ASC, b DESC]
/// Requested: [a DESC]
/// Reversed file: [a DESC, b ASC]
/// Result: Satisfies request (prefix match) → Inexact
/// ```
///
/// ## Example 2: Monotonic function
/// ```text
/// File ordering: [extract_year_month(ts) ASC, ts ASC]
/// Requested: [ts DESC]
/// Reversed file: [extract_year_month(ts) DESC, ts DESC]
/// Result: Through monotonicity, satisfies [ts DESC] → Inexact
/// ```
///
/// # Returns
/// * `Exact` - Created a source that guarantees perfect ordering
/// * `Inexact` - Created a source optimized for ordering (e.g., reversed row groups) but not perfectly sorted
/// * `Unsupported` - Cannot optimize for this ordering
///
/// # Deprecation / migration notes
/// - [`Self::try_reverse_output`] was renamed to this method and deprecated since `53.0.0`.
/// Per DataFusion's deprecation guidelines, it will be removed in `59.0.0` or later
/// (6 major versions or 6 months, whichever is longer).
/// - New implementations should override [`Self::try_pushdown_sort`] directly.
/// - For backwards compatibility, the default implementation of
/// [`Self::try_pushdown_sort`] delegates to the deprecated
/// [`Self::try_reverse_output`] until it is removed. After that point, the
/// default implementation will return [`SortOrderPushdownResult::Unsupported`].
fn try_pushdown_sort(
&self,
order: &[PhysicalSortExpr],
eq_properties: &EquivalenceProperties,
) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
#[expect(deprecated)]
self.try_reverse_output(order, eq_properties)
}
/// Deprecated: Renamed to [`Self::try_pushdown_sort`].
#[deprecated(
since = "53.0.0",
note = "Renamed to try_pushdown_sort. This method was never limited to reversing output. It will be removed in 59.0.0 or later."
)]
fn try_reverse_output(
&self,
_order: &[PhysicalSortExpr],
_eq_properties: &EquivalenceProperties,
) -> Result<SortOrderPushdownResult<Arc<dyn FileSource>>> {
Ok(SortOrderPushdownResult::Unsupported)
}
/// Try to push down a projection into this FileSource.
///
/// `FileSource` implementations that support projection pushdown should
/// override this method and return a new `FileSource` instance with the
/// projection incorporated.
///
/// If a `FileSource` does accept a projection it is expected to handle
/// the projection in it's entirety, including partition columns.
/// For example, the `FileSource` may translate that projection into a
/// file format specific projection (e.g. Parquet can push down struct field access,
/// some other file formats like Vortex can push down computed expressions into un-decoded data)
/// and also need to handle partition column projection (generally done by replacing partition column
/// references with literal values derived from each files partition values).
///
/// Not all FileSource's can handle complex expression pushdowns. For example,
/// a CSV file source may only support simple column selections. In such cases,
/// the `FileSource` can use [`SplitProjection`] and [`ProjectionOpener`]
/// to split the projection into a pushdownable part and a non-pushdownable part.
/// These helpers also handle partition column projection.
///
/// [`SplitProjection`]: crate::projection::SplitProjection
/// [`ProjectionOpener`]: crate::projection::ProjectionOpener
fn try_pushdown_projection(
&self,
_projection: &ProjectionExprs,
) -> Result<Option<Arc<dyn FileSource>>> {
Ok(None)
}
/// Deprecated: Set optional schema adapter factory.
///
/// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
/// See `upgrading.md` for more details.
#[deprecated(
since = "53.0.0",
note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
)]
#[expect(deprecated)]
fn with_schema_adapter_factory(
&self,
_factory: Arc<dyn SchemaAdapterFactory>,
) -> Result<Arc<dyn FileSource>> {
not_impl_err!(
"SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
)
}
/// Deprecated: Returns the current schema adapter factory if set.
///
/// `SchemaAdapterFactory` has been removed. Use `PhysicalExprAdapterFactory` instead.
/// See `upgrading.md` for more details.
#[deprecated(
since = "53.0.0",
note = "SchemaAdapterFactory has been removed. Use PhysicalExprAdapterFactory instead. See upgrading.md for more details."
)]
#[expect(deprecated)]
fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
None
}
/// Apply a function to all physical expressions used by this file source.
///
/// This includes:
/// - Filter predicates (which may contain dynamic filters)
/// - Projection expressions
///
/// The function `f` is called once for each expression. The function should
/// return `TreeNodeRecursion::Continue` to continue visiting other expressions,
/// or `TreeNodeRecursion::Stop` to stop visiting expressions early.
///
/// Implementations must explicitly visit all expressions. There is no default
/// implementation to ensure that all FileSource implementations handle this correctly.
///
/// See [`ExecutionPlan::apply_expressions`] for more details and examples.
///
/// [`ExecutionPlan::apply_expressions`]: datafusion_physical_plan::ExecutionPlan::apply_expressions
fn apply_expressions(
&self,
f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result<TreeNodeRecursion>,
) -> Result<TreeNodeRecursion>;
}