blob: 9a0cb494e495f4a89ad59f6306f79784e2167421 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::sync::Arc;
use arrow::datatypes::{Schema, SchemaRef};
use datafusion_common::{
Result, ScalarValue,
tree_node::{Transformed, TransformedResult, TreeNode},
};
use datafusion_physical_expr::{
expressions::{Column, Literal},
projection::{ProjectionExpr, ProjectionExprs},
};
use futures::{FutureExt, StreamExt};
use itertools::Itertools;
use crate::{
PartitionedFile, TableSchema,
file_stream::{FileOpenFuture, FileOpener},
};
/// A file opener that handles applying a projection on top of an inner opener.
///
/// This includes handling partition columns.
///
/// Any projection pushed down will be split up into:
/// - Simple column indices / column selection
/// - A remainder projection that this opener applies on top of it
///
/// This is meant to simplify projection pushdown for sources like CSV
/// that can only handle "simple" column selection.
pub struct ProjectionOpener {
inner: Arc<dyn FileOpener>,
projection: ProjectionExprs,
input_schema: SchemaRef,
partition_columns: Vec<PartitionColumnIndex>,
}
impl ProjectionOpener {
pub fn try_new(
projection: SplitProjection,
inner: Arc<dyn FileOpener>,
file_schema: &Schema,
) -> Result<Arc<dyn FileOpener>> {
Ok(Arc::new(ProjectionOpener {
inner,
projection: projection.remapped_projection,
input_schema: Arc::new(file_schema.project(&projection.file_indices)?),
partition_columns: projection.partition_columns,
}))
}
}
impl FileOpener for ProjectionOpener {
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
let partition_values = partitioned_file.partition_values.clone();
// Modify any references to partition columns in the projection expressions
// and substitute them with literal values from PartitionedFile.partition_values
let projection = if self.partition_columns.is_empty() {
self.projection.clone()
} else {
inject_partition_columns_into_projection(
&self.projection,
&self.partition_columns,
partition_values,
)
};
let projector = projection.make_projector(&self.input_schema)?;
let inner = self.inner.open(partitioned_file)?;
Ok(async move {
let stream = inner.await?;
let stream = stream.map(move |batch| {
let batch = batch?;
let batch = projector.project_batch(&batch)?;
Ok(batch)
});
Ok(stream.boxed())
}
.boxed())
}
}
#[derive(Debug, Clone, Copy)]
pub struct PartitionColumnIndex {
/// The index of this partition column in the remainder projection (>= num_file_columns)
pub in_remainder_projection: usize,
/// The index of this partition column in the partition_values array
pub in_partition_values: usize,
}
fn inject_partition_columns_into_projection(
projection: &ProjectionExprs,
partition_columns: &[PartitionColumnIndex],
partition_values: Vec<ScalarValue>,
) -> ProjectionExprs {
// Pre-create all literals for partition columns to avoid cloning ScalarValues multiple times.
let partition_literals: Vec<Arc<Literal>> = partition_values
.into_iter()
.map(|value| Arc::new(Literal::new(value)))
.collect();
let projections = projection
.iter()
.map(|projection| {
let expr = Arc::clone(&projection.expr)
.transform(|expr| {
let original_expr = Arc::clone(&expr);
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
// Check if this column index corresponds to a partition column
if let Some(pci) = partition_columns
.iter()
.find(|pci| pci.in_remainder_projection == column.index())
{
let literal =
Arc::clone(&partition_literals[pci.in_partition_values]);
return Ok(Transformed::yes(literal));
}
}
Ok(Transformed::no(original_expr))
})
.data()
.expect("infallible transform");
ProjectionExpr::new(expr, projection.alias.clone())
})
.collect_vec();
ProjectionExprs::new(projections)
}
/// At a high level the goal of SplitProjection is to take a ProjectionExprs meant to be applied to the table schema
/// and split that into:
/// - The projection indices into the file schema (file_indices)
/// - The projection indices into the partition values (partition_value_indices), which pre-compute both the index into the table schema
/// and the index into the partition values array
/// - A remapped projection that can be applied after the file projection is applied
/// This remapped projection has the following properties:
/// - Column indices referring to file columns are remapped to [0..file_indices.len())
/// - Column indices referring to partition columns are remapped to [file_indices.len()..)
///
/// This allows the ProjectionOpener to easily identify which columns in the remapped projection
/// refer to partition columns and substitute them with literals from the partition values.
#[derive(Debug, Clone)]
pub struct SplitProjection {
/// The original projection this [`SplitProjection`] was derived from
pub source: ProjectionExprs,
/// Column indices to read from file (public for file sources)
pub file_indices: Vec<usize>,
/// Pre-computed partition column mappings (internal, used by ProjectionOpener)
pub(crate) partition_columns: Vec<PartitionColumnIndex>,
/// The remapped projection (internal, used by ProjectionOpener)
pub(crate) remapped_projection: ProjectionExprs,
}
impl SplitProjection {
pub fn unprojected(table_schema: &TableSchema) -> Self {
let projection = ProjectionExprs::from_indices(
&(0..table_schema.table_schema().fields().len()).collect_vec(),
table_schema.table_schema(),
);
Self::new(table_schema.file_schema(), &projection)
}
/// Creates a new [`SplitProjection`] by splitting a projection into
/// simple file column indices and a remainder projection that is applied after reading the file.
///
/// In other words: we get a `Vec<usize>` projection that is meant to be applied on top of `file_schema`
/// and a remainder projection that is applied to the result of that first projection.
///
/// Here `file_schema` is expected to be the *logical* schema of the file, that is the
/// table schema minus any partition columns.
/// Partition columns are always expected to be at the end of the table schema.
/// Note that `file_schema` is *not* the physical schema of the file.
pub fn new(logical_file_schema: &Schema, projection: &ProjectionExprs) -> Self {
let num_file_schema_columns = logical_file_schema.fields().len();
// Collect all unique columns and classify as file or partition
let mut file_columns = Vec::new();
let mut partition_columns = Vec::new();
let mut all_columns = std::collections::HashMap::new();
// Extract all unique column references (index -> name)
for proj_expr in projection {
proj_expr
.expr
.apply(|expr| {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
all_columns
.entry(column.index())
.or_insert_with(|| column.name().to_string());
}
Ok(datafusion_common::tree_node::TreeNodeRecursion::Continue)
})
.expect("infallible apply");
}
// Sort by index and classify into file vs partition columns
let mut sorted_columns: Vec<_> = all_columns
.into_iter()
.map(|(idx, name)| (name, idx))
.collect();
sorted_columns.sort_by_key(|(_, idx)| *idx);
// Separate file and partition columns, assigning final indices
// Pre-create all remapped columns to avoid duplicate Arc'd expressions
let mut column_mapping = std::collections::HashMap::new();
let mut file_idx = 0;
let mut partition_idx = 0;
for (name, original_index) in sorted_columns {
let new_index = if original_index < num_file_schema_columns {
// File column: gets index [0..num_file_columns)
file_columns.push(original_index);
let idx = file_idx;
file_idx += 1;
idx
} else {
// Partition column: gets index [num_file_columns..)
partition_columns.push(original_index);
let idx = file_idx + partition_idx;
partition_idx += 1;
idx
};
// Pre-create the remapped column so all references can share the same Arc
let new_column: Arc<dyn datafusion_physical_plan::PhysicalExpr> =
Arc::new(Column::new(&name, new_index));
column_mapping.insert(original_index, new_column);
}
// Single tree transformation: remap all column references using pre-created columns
let remapped_projection = projection
.iter()
.map(|proj_expr| {
let expr = Arc::clone(&proj_expr.expr)
.transform(|expr| {
let original_expr = Arc::clone(&expr);
if let Some(column) = expr.as_any().downcast_ref::<Column>()
&& let Some(new_column) = column_mapping.get(&column.index())
{
return Ok(Transformed::yes(Arc::clone(new_column)));
}
Ok(Transformed::no(original_expr))
})
.data()
.expect("infallible transform");
ProjectionExpr::new(expr, proj_expr.alias.clone())
})
.collect_vec();
// Pre-compute partition column mappings for ProjectionOpener
let num_file_columns = file_columns.len();
let partition_column_mappings = partition_columns
.iter()
.enumerate()
.map(|(partition_idx, &table_index)| PartitionColumnIndex {
in_remainder_projection: num_file_columns + partition_idx,
in_partition_values: table_index - num_file_schema_columns,
})
.collect_vec();
Self {
source: projection.clone(),
file_indices: file_columns,
partition_columns: partition_column_mappings,
remapped_projection: ProjectionExprs::from(remapped_projection),
}
}
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use arrow::array::AsArray;
use arrow::datatypes::{DataType, SchemaRef};
use datafusion_common::{DFSchema, ScalarValue, record_batch};
use datafusion_expr::{Expr, col, execution_props::ExecutionProps};
use datafusion_physical_expr::{create_physical_exprs, projection::ProjectionExpr};
use itertools::Itertools;
use super::*;
fn create_projection_exprs<'a>(
exprs: impl IntoIterator<Item = &'a Expr>,
schema: &SchemaRef,
) -> ProjectionExprs {
let df_schema = DFSchema::try_from(Arc::clone(schema)).unwrap();
let physical_exprs =
create_physical_exprs(exprs, &df_schema, &ExecutionProps::default()).unwrap();
let projection_exprs = physical_exprs
.into_iter()
.enumerate()
.map(|(i, e)| ProjectionExpr::new(Arc::clone(&e), format!("col{i}")))
.collect_vec();
ProjectionExprs::from(projection_exprs)
}
#[test]
fn test_split_projection_with_partition_columns() {
use arrow::array::AsArray;
use arrow::datatypes::Field;
// Simulate the avro_exec_with_partition test scenario:
// file_schema has 3 fields
let file_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("bool_col", DataType::Boolean, false),
Field::new("tinyint_col", DataType::Int8, false),
]));
// table_schema has 4 fields (3 file + 1 partition)
let table_schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("bool_col", DataType::Boolean, false),
Field::new("tinyint_col", DataType::Int8, false),
Field::new("date", DataType::Utf8, false), // partition column at index 3
]));
// projection indices: [0, 1, 3, 2]
// This should select: id (0), bool_col (1), date (3-partition), tinyint_col (2)
let projection_indices = vec![0, 1, 3, 2];
// Create projection expressions from indices using the table schema
let projection =
ProjectionExprs::from_indices(&projection_indices, &table_schema);
// Call SplitProjection to separate file and partition columns
let split = SplitProjection::new(&file_schema, &projection);
// The file_indices should be [0, 1, 2] (all file columns needed)
assert_eq!(split.file_indices, vec![0, 1, 2]);
// Should have 1 partition column at in_partition_values index 0
assert_eq!(split.partition_columns.len(), 1);
assert_eq!(split.partition_columns[0].in_partition_values, 0);
// Now create a batch with only the file columns
let file_batch = record_batch!(
("id", Int32, vec![4]),
("bool_col", Boolean, vec![true]),
("tinyint_col", Int8, vec![0])
)
.unwrap();
// After the fix, the remainder projection should have remapped indices:
// - File columns: [0, 1, 2] (unchanged since they're already in order)
// - Partition column: [3] (stays at index 3, which is >= num_file_columns)
// So the remainder expects input columns [0, 1, 2] and references column [3] for partition
// Verify that we can inject partition columns and apply the projection
let partition_values = vec![ScalarValue::from("2021-10-26")];
// Create partition column mapping
let partition_columns = vec![PartitionColumnIndex {
in_remainder_projection: 3, // partition column is at index 3 in remainder
in_partition_values: 0, // first partition value
}];
// Inject partition columns (replaces Column(3) with Literal)
let injected_projection = inject_partition_columns_into_projection(
&split.remapped_projection,
&partition_columns,
partition_values,
);
// Now the projection should work on the file batch
let projector = injected_projection
.make_projector(&file_batch.schema())
.unwrap();
let result = projector.project_batch(&file_batch).unwrap();
// Verify the output has the correct column order: id, bool_col, date, tinyint_col
assert_eq!(result.num_columns(), 4);
assert_eq!(
result
.column(0)
.as_primitive::<arrow::datatypes::Int32Type>()
.value(0),
4
);
assert!(result.column(1).as_boolean().value(0));
assert_eq!(result.column(2).as_string::<i32>().value(0), "2021-10-26");
assert_eq!(
result
.column(3)
.as_primitive::<arrow::datatypes::Int8Type>()
.value(0),
0
);
}
// ========================================================================
// Comprehensive Test Suite for SplitProjection
// ========================================================================
// Helper to create test schemas with file and partition columns
fn create_test_schemas(
file_cols: usize,
partition_cols: usize,
) -> (SchemaRef, SchemaRef) {
use arrow::datatypes::Field;
let file_fields: Vec<_> = (0..file_cols)
.map(|i| Field::new(format!("col_{i}"), DataType::Int32, false))
.collect();
let mut table_fields = file_fields.clone();
table_fields.extend(
(0..partition_cols)
.map(|i| Field::new(format!("part_{i}"), DataType::Utf8, false)),
);
(
Arc::new(Schema::new(file_fields)),
Arc::new(Schema::new(table_fields)),
)
}
// ========================================================================
// Partition Column Handling Tests
// ========================================================================
#[test]
fn test_split_projection_only_file_columns() {
let (file_schema, table_schema) = create_test_schemas(3, 2);
// Select only file columns [0, 1, 2]
let projection = ProjectionExprs::from_indices(&[0, 1, 2], &table_schema);
let split = SplitProjection::new(&file_schema, &projection);
assert_eq!(split.file_indices, vec![0, 1, 2]);
assert_eq!(split.partition_columns.len(), 0);
}
#[test]
fn test_split_projection_only_partition_columns() {
let (file_schema, table_schema) = create_test_schemas(3, 2);
// Select only partition columns [3, 4]
let projection = ProjectionExprs::from_indices(&[3, 4], &table_schema);
let split = SplitProjection::new(&file_schema, &projection);
assert_eq!(split.file_indices, Vec::<usize>::new());
assert_eq!(split.partition_columns.len(), 2);
assert_eq!(split.partition_columns[0].in_partition_values, 0);
assert_eq!(split.partition_columns[1].in_partition_values, 1);
}
#[test]
fn test_split_projection_multiple_partition_columns() {
let (file_schema, table_schema) = create_test_schemas(2, 3);
// File cols: 0, 1; Partition cols: 2, 3, 4
// Select: [0, 2, 4, 1, 3] (mixed file and partition)
let projection = ProjectionExprs::from_indices(&[0, 2, 4, 1, 3], &table_schema);
let split = SplitProjection::new(&file_schema, &projection);
assert_eq!(split.file_indices, vec![0, 1]);
assert_eq!(split.partition_columns.len(), 3);
assert_eq!(split.partition_columns[0].in_partition_values, 0);
assert_eq!(split.partition_columns[1].in_partition_values, 1);
assert_eq!(split.partition_columns[2].in_partition_values, 2);
// Verify remapped projection has correct indices
// File columns should be at [0, 1], partition columns at [2, 3, 4]
assert_eq!(split.remapped_projection.iter().count(), 5);
}
#[test]
fn test_split_projection_partition_columns_reverse_order() {
let (file_schema, table_schema) = create_test_schemas(2, 2);
// File cols: 0, 1; Partition cols: 2, 3
// Select: [3, 2] (partitions in reverse)
let projection = ProjectionExprs::from_indices(&[3, 2], &table_schema);
let split = SplitProjection::new(&file_schema, &projection);
assert_eq!(split.file_indices, Vec::<usize>::new());
assert_eq!(split.partition_columns.len(), 2);
assert_eq!(split.partition_columns[0].in_partition_values, 0);
assert_eq!(split.partition_columns[1].in_partition_values, 1);
}
#[test]
fn test_split_projection_interleaved_file_and_partition() {
let (file_schema, table_schema) = create_test_schemas(3, 3);
// File cols: 0, 1, 2; Partition cols: 3, 4, 5
// Select: [0, 3, 1, 4, 2, 5] (alternating)
let projection =
ProjectionExprs::from_indices(&[0, 3, 1, 4, 2, 5], &table_schema);
let split = SplitProjection::new(&file_schema, &projection);
assert_eq!(split.file_indices, vec![0, 1, 2]);
assert_eq!(split.partition_columns.len(), 3);
assert_eq!(split.partition_columns[0].in_partition_values, 0);
assert_eq!(split.partition_columns[1].in_partition_values, 1);
assert_eq!(split.partition_columns[2].in_partition_values, 2);
}
#[test]
fn test_split_projection_expression_with_file_and_partition_columns() {
use arrow::datatypes::Field;
// Create schemas: 2 file columns, 1 partition column
let file_schema = Arc::new(Schema::new(vec![
Field::new("file_a", DataType::Int32, false),
Field::new("file_b", DataType::Int32, false),
]));
let table_schema = Arc::new(Schema::new(vec![
Field::new("file_a", DataType::Int32, false),
Field::new("file_b", DataType::Int32, false),
Field::new("part_c", DataType::Int32, false),
]));
// Create expression: file_a + part_c
let exprs = [col("file_a") + col("part_c")];
let projection = create_projection_exprs(exprs.iter(), &table_schema);
let split = SplitProjection::new(&file_schema, &projection);
// Should extract both columns
assert_eq!(split.file_indices, vec![0]);
assert_eq!(split.partition_columns.len(), 1);
assert_eq!(split.partition_columns[0].in_partition_values, 0);
}
// ========================================================================
// Category 4: Boundary Conditions
// ========================================================================
#[test]
fn test_split_projection_boundary_last_file_column() {
let (file_schema, table_schema) = create_test_schemas(3, 2);
// Last file column is index 2
let projection = ProjectionExprs::from_indices(&[2], &table_schema);
let split = SplitProjection::new(&file_schema, &projection);
assert_eq!(split.file_indices, vec![2]);
assert_eq!(split.partition_columns.len(), 0);
}
#[test]
fn test_split_projection_boundary_first_partition_column() {
let (file_schema, table_schema) = create_test_schemas(3, 2);
// First partition column is index 3
let projection = ProjectionExprs::from_indices(&[3], &table_schema);
let split = SplitProjection::new(&file_schema, &projection);
assert_eq!(split.file_indices, Vec::<usize>::new());
assert_eq!(split.partition_columns.len(), 1);
assert_eq!(split.partition_columns[0].in_partition_values, 0);
}
// ========================================================================
// Category 6: Integration Tests
// ========================================================================
#[test]
fn test_inject_partition_columns_multiple_partitions() {
let data =
record_batch!(("col_0", Int32, vec![1]), ("col_1", Int32, vec![2])).unwrap();
// Create projection that references file columns and partition columns
let (file_schema, table_schema) = create_test_schemas(2, 2);
// Projection: [0, 2, 1, 3] = [file_0, part_0, file_1, part_1]
let projection = ProjectionExprs::from_indices(&[0, 2, 1, 3], &table_schema);
let split = SplitProjection::new(&file_schema, &projection);
// Create partition column mappings
let partition_columns = vec![
PartitionColumnIndex {
in_remainder_projection: 2, // First partition column at index 2
in_partition_values: 0,
},
PartitionColumnIndex {
in_remainder_projection: 3, // Second partition column at index 3
in_partition_values: 1,
},
];
let partition_values =
vec![ScalarValue::from("part_a"), ScalarValue::from("part_b")];
let injected = inject_partition_columns_into_projection(
&split.remapped_projection,
&partition_columns,
partition_values,
);
// Apply projection
let projector = injected.make_projector(&data.schema()).unwrap();
let result = projector.project_batch(&data).unwrap();
assert_eq!(result.num_columns(), 4);
assert_eq!(
result
.column(0)
.as_primitive::<arrow::datatypes::Int32Type>()
.value(0),
1
);
assert_eq!(result.column(1).as_string::<i32>().value(0), "part_a");
assert_eq!(
result
.column(2)
.as_primitive::<arrow::datatypes::Int32Type>()
.value(0),
2
);
assert_eq!(result.column(3).as_string::<i32>().value(0), "part_b");
}
}