feat: sort file groups by statistics during sort pushdown (Sort pushdown phase 2) (#21182) ## Which issue does this PR close? - Closes https://github.com/apache/datafusion/issues/17348 - Closes https://github.com/apache/datafusion/issues/19329 ## Rationale for this change When a partition (file group) contains multiple files in wrong order, `validated_output_ordering()` strips the ordering and `EnforceSorting` inserts an unnecessary `SortExec` — even though the files are non-overlapping and internally sorted. This PR fixes it by **sorting files within each group by min/max statistics** during sort pushdown. After sorting, the file order matches the sort key order, the ordering becomes valid, and `SortExec` can be eliminated. This works for both single-partition and multi-partition plans with multi-file groups. ## What changes are included in this PR? ### Core optimization ```text Files in wrong order within a partition: After statistics-based sorting: [a_high(400k+), b_mid(200k), c_low(1)] [c_low(1), b_mid(200k), a_high(400k+)] → ordering stripped → ordering valid, non-overlapping → SortExec stays → SortExec eliminated ``` When `PushdownSort` finds a `SortExec` above a file-based `DataSourceExec`: 1. **FileSource returns Exact** (natural ordering satisfies request): - Sort files within each group by statistics, verify non-overlapping - SortExec removed, fetch (LIMIT) pushed to DataSourceExec 2. **FileSource returns Unsupported** (ordering stripped due to wrong file order): - Sort files within each group by statistics - Re-check: if files are now non-overlapping and ordering is valid → upgrade to Exact - SortExec eliminated + fetch pushed down 3. **FileSource returns Inexact** (reverse scan): - SortExec kept, scan optimized with reverse_row_groups ### Files Changed | File | Change | |------|--------| | `datasource-parquet/src/source.rs` | ParquetSource returns `Exact` when natural ordering satisfies request | | `datasource/src/file_scan_config.rs` | Statistics-based file sorting, non-overlapping re-check, Unsupported→Exact upgrade | | `physical-optimizer/src/pushdown_sort.rs` | Preserve fetch (LIMIT) when eliminating SortExec, module doc update | | `core/tests/physical_optimizer/pushdown_sort.rs` | Updated prefix match test | | `sqllogictest/test_files/sort_pushdown.slt` | Updated existing tests + 5 new test groups (A-E) | ## Benchmark Results Local release build, `--partitions 1`, 3 non-overlapping files with reversed naming (6M rows): | Query | Description | Main (ms) | PR (ms) | Speedup | |-------|-------------|-----------|---------|---------| | Q1 | `ORDER BY ASC` (full scan) | 259 | 122 | **2.1x faster** | | Q2 | `ORDER BY ASC LIMIT 100` | 80 | 3 | **27x faster** | | Q3 | `SELECT * ORDER BY ASC` | 700 | 313 | **2.2x faster** | | Q4 | `SELECT * LIMIT 100` | 342 | 7 | **49x faster** | LIMIT queries benefit most because sort elimination + limit pushdown means only the first ~100 rows are read. ## Tests - 13 new unit tests covering all sort pushdown paths - 5 new SLT integration test groups (sort elimination, overlapping files, LIMIT, multi-partition, inferred ordering) - All existing tests pass with no regressions ## Test plan - [x] `cargo test -p datafusion-datasource` — all tests pass - [x] `cargo test -p datafusion-datasource-parquet` — all tests pass - [x] `cargo test -p datafusion-physical-optimizer` — all tests pass - [x] `cargo test -p datafusion --test core_integration` — all tests pass - [x] SLT sort/order/topk/window/union/joins tests pass (no regressions) - [x] `cargo clippy` — 0 warnings 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
DataFusion is an extensible query engine written in Rust that uses Apache Arrow as its in-memory format.
This crate provides libraries and binaries for developers building fast and feature-rich database and analytic systems, customized for particular workloads. See use cases for examples. The following related subprojects target end users:
“Out of the box,” DataFusion offers SQL and DataFrame APIs, excellent performance, built-in support for CSV, Parquet, JSON, and Avro, extensive customization, and a great community.
DataFusion features a full query planner, a columnar, streaming, multi-threaded, vectorized execution engine, and partitioned data sources. You can customize DataFusion at almost all points including additional data sources, query languages, functions, custom operators and more. See the Architecture section for more details.
Here are links to important resources:
DataFusion is great for building projects such as domain-specific query engines, new database platforms and data pipelines, query languages and more. It lets you start quickly from a fully working engine, and then customize those features specific to your needs. See the list of known users.
Please see the contributor guide and communication pages for more information.
This crate has several features which can be specified in your Cargo.toml.
Default features:
nested_expressions: functions for working with nested types such as array_to_stringcompression: reading files compressed with xz2, bzip2, flate2, and zstdcrypto_expressions: cryptographic functions such as md5 and sha256datetime_expressions: date and time functions such as to_timestampencoding_expressions: encode and decode functionsparquet: support for reading the Apache Parquet formatsql: support for SQL parsing and planningregex_expressions: regular expression functions, such as regexp_matchunicode_expressions: include Unicode-aware functions such as character_lengthunparser: enables support to reverse LogicalPlans back into SQLrecursive_protection: uses recursive for stack overflow protection.Optional features:
avro: support for reading the Apache Avro formatbacktrace: include backtrace information in error messagesparquet_encryption: support for using Parquet Modular Encryptionserde: enable arrow-schema's serde featurePublic methods in Apache DataFusion evolve over time: while we try to maintain a stable API, we also improve the API over time. As a result, we typically deprecate methods before removing them, according to the deprecation guidelines.
Cargo.lockFollowing the guidance on committing Cargo.lock files, this project commits its Cargo.lock file.
CI uses the committed Cargo.lock file, and dependencies are updated regularly using Dependabot PRs.