blob: 308e91d0df145a55ec6f456696103faa217e8dc6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Sort Pushdown Optimization
//!
//! This optimizer attempts to push sort requirements down through the execution plan
//! tree to data sources that can natively handle them (e.g., by scanning files in
//! reverse order).
//!
//! ## How it works
//!
//! 1. Detects `SortExec` nodes in the plan
//! 2. Calls `try_pushdown_sort()` on the input to recursively push the sort requirement
//! 3. Each node type defines its own pushdown behavior:
//! - **Transparent nodes** (CoalesceBatchesExec, RepartitionExec, etc.) delegate to
//! their children and wrap the result
//! - **Data sources** (DataSourceExec) check if they can optimize for the ordering
//! - **Blocking nodes** return `Unsupported` to stop pushdown
//! 4. Based on the result:
//! - `Exact`: Remove the Sort operator (data source guarantees perfect ordering)
//! - `Inexact`: Keep Sort but use optimized input (enables early termination for TopK)
//! - `Unsupported`: No change
//!
//! ## Capabilities
//!
//! - **Sort elimination**: when a data source's natural ordering satisfies the
//! request, return `Exact` and remove the `SortExec` entirely. Preserves
//! `fetch` (LIMIT) from the eliminated `SortExec` for early termination.
//! - **Statistics-based file sorting**: sort files within each partition by
//! min/max statistics. When files are non-overlapping but listed in wrong
//! order (e.g., alphabetical order ≠ sort key order), this fixes the ordering
//! and enables sort elimination. Works for both single-partition and
//! multi-partition plans with multi-file groups.
//! - **Reverse scan optimization**: when required sort is the reverse of the data source's
//! natural ordering, enable reverse scanning (reading row groups in reverse order)
//! - **Prefix matching**: if data has ordering [A DESC, B ASC] and query needs
//! [A DESC], the existing ordering satisfies the requirement (`Exact`).
//! If the query needs [A ASC] (reverse of the prefix), a reverse scan is
//! used (`Inexact`, `SortExec` retained)
//!
//! Related issue: <https://github.com/apache/datafusion/issues/17348>
use crate::PhysicalOptimizerRule;
use datafusion_common::Result;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_plan::ExecutionPlan;
use datafusion_physical_plan::SortOrderPushdownResult;
use datafusion_physical_plan::buffer::BufferExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use std::sync::Arc;
/// Per-partition buffer capacity (in bytes) inserted between SPM and
/// DataSourceExec when sort elimination removes the buffering SortExec.
///
/// SortExec buffers all input data in memory (potentially GB per partition)
/// before outputting sorted results. When we eliminate SortExec, SPM reads
/// directly from I/O-bound sources. BufferExec compensates with bounded
/// buffering, allowing I/O to pipeline with merge computation.
///
/// This is strictly less memory than the SortExec it replaces, and only
/// inserted when PushdownSort eliminates a SortExec — no impact on other
/// query plans. BufferExec also integrates with MemoryPool, so it respects
/// the global memory limit and won't cause OOM.
const BUFFER_CAPACITY_AFTER_SORT_ELIMINATION: usize = 64 * 1024 * 1024; // 64 MB
/// A PhysicalOptimizerRule that attempts to push down sort requirements to data sources.
///
/// See module-level documentation for details.
#[derive(Debug, Clone, Default)]
pub struct PushdownSort;
impl PushdownSort {
pub fn new() -> Self {
Self {}
}
}
impl PhysicalOptimizerRule for PushdownSort {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
// Check if sort pushdown optimization is enabled
if !config.optimizer.enable_sort_pushdown {
return Ok(plan);
}
// Use transform_down to find and optimize all SortExec nodes (including nested ones)
// Also handles SPM → SortExec pattern to insert BufferExec when sort is eliminated
plan.transform_down(|plan: Arc<dyn ExecutionPlan>| {
// Pattern 1: SPM → SortExec(preserve_partitioning)
// When we eliminate the SortExec, SPM loses its memory buffer and reads
// directly from I/O-bound sources. Insert a BufferExec to compensate.
if let Some(spm) = plan.downcast_ref::<SortPreservingMergeExec>()
&& let Some(sort_child) = spm.input().downcast_ref::<SortExec>()
&& sort_child.preserve_partitioning()
{
let sort_input = Arc::clone(sort_child.input());
let required_ordering = sort_child.expr();
match sort_input.try_pushdown_sort(required_ordering)? {
SortOrderPushdownResult::Exact { inner } => {
let inner = if let Some(fetch) = sort_child.fetch() {
inner.with_fetch(Some(fetch)).unwrap_or(inner)
} else {
inner
};
// Insert BufferExec to replace SortExec's buffering role.
// SortExec buffered all data in memory; BufferExec provides
// bounded buffering so SPM doesn't stall on I/O.
let buffered: Arc<dyn ExecutionPlan> = Arc::new(BufferExec::new(
inner,
BUFFER_CAPACITY_AFTER_SORT_ELIMINATION,
));
let new_spm =
SortPreservingMergeExec::new(spm.expr().clone(), buffered)
.with_fetch(spm.fetch());
return Ok(Transformed::yes(Arc::new(new_spm)));
}
SortOrderPushdownResult::Inexact { inner } => {
let new_sort = SortExec::new(required_ordering.clone(), inner)
.with_fetch(sort_child.fetch())
.with_preserve_partitioning(true);
let new_spm = SortPreservingMergeExec::new(
spm.expr().clone(),
Arc::new(new_sort),
)
.with_fetch(spm.fetch());
return Ok(Transformed::yes(Arc::new(new_spm)));
}
SortOrderPushdownResult::Unsupported => {
return Ok(Transformed::no(plan));
}
}
}
// Pattern 2: Standalone SortExec (no SPM parent)
let Some(sort_exec) = plan.downcast_ref::<SortExec>() else {
return Ok(Transformed::no(plan));
};
let sort_input = Arc::clone(sort_exec.input());
let required_ordering = sort_exec.expr();
// Try to push the sort requirement down through the plan tree
// Each node type defines its own pushdown behavior via try_pushdown_sort()
match sort_input.try_pushdown_sort(required_ordering)? {
SortOrderPushdownResult::Exact { inner } => {
// Data source guarantees perfect ordering - remove the Sort operator.
// Preserve the fetch (LIMIT) from the original SortExec so the
// data source can stop reading early.
let inner = if let Some(fetch) = sort_exec.fetch() {
inner.with_fetch(Some(fetch)).unwrap_or(inner)
} else {
inner
};
Ok(Transformed::yes(inner))
}
SortOrderPushdownResult::Inexact { inner } => {
// Data source is optimized for the ordering but not perfectly sorted
// Keep the Sort operator but use the optimized input
// Benefits: TopK queries can terminate early, better cache locality
Ok(Transformed::yes(Arc::new(
SortExec::new(required_ordering.clone(), inner)
.with_fetch(sort_exec.fetch())
.with_preserve_partitioning(
sort_exec.preserve_partitioning(),
),
)))
}
SortOrderPushdownResult::Unsupported => {
// Cannot optimize for this ordering - no change
Ok(Transformed::no(plan))
}
}
})
.data()
}
fn name(&self) -> &str {
"PushdownSort"
}
fn schema_check(&self) -> bool {
true
}
}