blob: d3a7d2ec67f3c2ebb554f19fd628649b0aec81a6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::assert_batches_eq;
use datafusion::catalog::memory::DataSourceExec;
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion::common::DFSchema;
use datafusion::common::{Result, ScalarValue};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::execution::context::SessionContext;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_expr::expressions::{CastExpr, Column, Literal};
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{lit, SessionConfig};
use datafusion_physical_expr_adapter::{
DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory,
};
use futures::StreamExt;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
// Metadata key for storing default values in field metadata
const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
// Example showing how to implement custom default value handling for missing columns
// using field metadata and PhysicalExprAdapter.
//
// This example demonstrates how to:
// 1. Store default values in field metadata using a constant key
// 2. Create a custom PhysicalExprAdapter that reads these defaults
// 3. Inject default values for missing columns in filter predicates
// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema adaptation
// 5. Wrap string default values in cast expressions for proper type conversion
//
// Important: PhysicalExprAdapter is specifically designed for rewriting filter predicates
// that get pushed down to file scans. For handling missing columns in projections,
// other mechanisms in DataFusion are used (like SchemaAdapter).
//
// The metadata-based approach provides a flexible way to store default values as strings
// and cast them to the appropriate types at query time.
#[tokio::main]
async fn main() -> Result<()> {
println!("=== Creating example data with missing columns and default values ===");
// Create sample data where the logical schema has more columns than the physical schema
let (logical_schema, physical_schema, batch) = create_sample_data_with_defaults();
let store = InMemory::new();
let buf = {
let mut buf = vec![];
let props = WriterProperties::builder()
.set_max_row_group_size(2)
.build();
let mut writer =
ArrowWriter::try_new(&mut buf, physical_schema.clone(), Some(props))
.expect("creating writer");
writer.write(&batch).expect("Writing batch");
writer.close().unwrap();
buf
};
let path = Path::from("example.parquet");
let payload = PutPayload::from_bytes(buf.into());
store.put(&path, payload).await?;
// Create a custom table provider that handles missing columns with defaults
let table_provider = Arc::new(DefaultValueTableProvider::new(logical_schema));
// Set up query execution
let mut cfg = SessionConfig::new();
cfg.options_mut().execution.parquet.pushdown_filters = true;
let ctx = SessionContext::new_with_config(cfg);
// Register our table
ctx.register_table("example_table", table_provider)?;
ctx.runtime_env().register_object_store(
ObjectStoreUrl::parse("memory://")?.as_ref(),
Arc::new(store),
);
println!("\n=== Demonstrating default value injection in filter predicates ===");
let query = "SELECT id, name FROM example_table WHERE status = 'active' ORDER BY id";
println!("Query: {query}");
println!("Note: The 'status' column doesn't exist in the physical schema,");
println!(
"but our adapter injects the default value 'active' for the filter predicate."
);
let batches = ctx.sql(query).await?.collect().await?;
#[rustfmt::skip]
let expected = [
"+----+-------+",
"| id | name |",
"+----+-------+",
"| 1 | Alice |",
"| 2 | Bob |",
"| 3 | Carol |",
"+----+-------+",
];
arrow::util::pretty::print_batches(&batches)?;
assert_batches_eq!(expected, &batches);
println!("\n=== Key Insight ===");
println!("This example demonstrates how PhysicalExprAdapter works:");
println!("1. Physical schema only has 'id' and 'name' columns");
println!("2. Logical schema has 'id', 'name', 'status', and 'priority' columns with defaults");
println!("3. Our custom adapter intercepts filter expressions on missing columns");
println!("4. Default values from metadata are injected as cast expressions");
println!("5. The DefaultPhysicalExprAdapter handles other schema adaptations");
println!("\nNote: PhysicalExprAdapter is specifically for filter predicates.");
println!("For projection columns, different mechanisms handle missing columns.");
Ok(())
}
/// Create sample data with a logical schema that has default values in metadata
/// and a physical schema that's missing some columns
fn create_sample_data_with_defaults() -> (SchemaRef, SchemaRef, RecordBatch) {
// Create metadata for default values
let mut status_metadata = HashMap::new();
status_metadata.insert(DEFAULT_VALUE_METADATA_KEY.to_string(), "active".to_string());
let mut priority_metadata = HashMap::new();
priority_metadata.insert(DEFAULT_VALUE_METADATA_KEY.to_string(), "1".to_string());
// The logical schema includes all columns with their default values in metadata
// Note: We make the columns with defaults nullable to allow the default adapter to handle them
let logical_schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
Field::new("status", DataType::Utf8, true).with_metadata(status_metadata),
Field::new("priority", DataType::Int32, true).with_metadata(priority_metadata),
]);
// The physical schema only has some columns (simulating missing columns in storage)
let physical_schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]);
// Create sample data for the physical schema
let batch = RecordBatch::try_new(
Arc::new(physical_schema.clone()),
vec![
Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
Arc::new(arrow::array::StringArray::from(vec![
"Alice", "Bob", "Carol",
])),
],
)
.unwrap();
(Arc::new(logical_schema), Arc::new(physical_schema), batch)
}
/// Custom TableProvider that uses DefaultValuePhysicalExprAdapter
#[derive(Debug)]
struct DefaultValueTableProvider {
schema: SchemaRef,
}
impl DefaultValueTableProvider {
fn new(schema: SchemaRef) -> Self {
Self { schema }
}
}
#[async_trait]
impl TableProvider for DefaultValueTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn table_type(&self) -> TableType {
TableType::Base
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let schema = self.schema.clone();
let df_schema = DFSchema::try_from(schema.clone())?;
let filter = state.create_physical_expr(
conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true)),
&df_schema,
)?;
let parquet_source = ParquetSource::default()
.with_predicate(filter)
.with_pushdown_filters(true);
let object_store_url = ObjectStoreUrl::parse("memory://")?;
let store = state.runtime_env().object_store(object_store_url)?;
let mut files = vec![];
let mut listing = store.list(None);
while let Some(file) = listing.next().await {
if let Ok(file) = file {
files.push(file);
}
}
let file_group = files
.iter()
.map(|file| PartitionedFile::new(file.location.clone(), file.size))
.collect();
let file_scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("memory://")?,
self.schema.clone(),
Arc::new(parquet_source),
)
.with_projection_indices(projection.cloned())
.with_limit(limit)
.with_file_group(file_group)
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));
Ok(Arc::new(DataSourceExec::new(Arc::new(
file_scan_config.build(),
))))
}
}
/// Factory for creating DefaultValuePhysicalExprAdapter instances
#[derive(Debug)]
struct DefaultValuePhysicalExprAdapterFactory;
impl PhysicalExprAdapterFactory for DefaultValuePhysicalExprAdapterFactory {
fn create(
&self,
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
) -> Arc<dyn PhysicalExprAdapter> {
let default_factory = DefaultPhysicalExprAdapterFactory;
let default_adapter = default_factory
.create(logical_file_schema.clone(), physical_file_schema.clone());
Arc::new(DefaultValuePhysicalExprAdapter {
logical_file_schema,
physical_file_schema,
default_adapter,
partition_values: Vec::new(),
})
}
}
/// Custom PhysicalExprAdapter that handles missing columns with default values from metadata
/// and wraps DefaultPhysicalExprAdapter for standard schema adaptation
#[derive(Debug)]
struct DefaultValuePhysicalExprAdapter {
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
default_adapter: Arc<dyn PhysicalExprAdapter>,
partition_values: Vec<(FieldRef, ScalarValue)>,
}
impl PhysicalExprAdapter for DefaultValuePhysicalExprAdapter {
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
// First try our custom default value injection for missing columns
let rewritten = expr
.transform(|expr| {
self.inject_default_values(
expr,
&self.logical_file_schema,
&self.physical_file_schema,
)
})
.data()?;
// Then apply the default adapter as a fallback to handle standard schema differences
// like type casting, partition column handling, etc.
let default_adapter = if !self.partition_values.is_empty() {
self.default_adapter
.with_partition_values(self.partition_values.clone())
} else {
self.default_adapter.clone()
};
default_adapter.rewrite(rewritten)
}
fn with_partition_values(
&self,
partition_values: Vec<(FieldRef, ScalarValue)>,
) -> Arc<dyn PhysicalExprAdapter> {
Arc::new(DefaultValuePhysicalExprAdapter {
logical_file_schema: self.logical_file_schema.clone(),
physical_file_schema: self.physical_file_schema.clone(),
default_adapter: self.default_adapter.clone(),
partition_values,
})
}
}
impl DefaultValuePhysicalExprAdapter {
fn inject_default_values(
&self,
expr: Arc<dyn PhysicalExpr>,
logical_file_schema: &Schema,
physical_file_schema: &Schema,
) -> Result<Transformed<Arc<dyn PhysicalExpr>>> {
if let Some(column) = expr.as_any().downcast_ref::<Column>() {
let column_name = column.name();
// Check if this column exists in the physical schema
if physical_file_schema.index_of(column_name).is_err() {
// Column is missing from physical schema, check if logical schema has a default
if let Ok(logical_field) =
logical_file_schema.field_with_name(column_name)
{
if let Some(default_value_str) =
logical_field.metadata().get(DEFAULT_VALUE_METADATA_KEY)
{
// Create a string literal and wrap it in a cast expression
let default_literal = self.create_default_value_expr(
default_value_str,
logical_field.data_type(),
)?;
return Ok(Transformed::yes(default_literal));
}
}
}
}
// No transformation needed
Ok(Transformed::no(expr))
}
fn create_default_value_expr(
&self,
value_str: &str,
data_type: &DataType,
) -> Result<Arc<dyn PhysicalExpr>> {
// Create a string literal with the default value
let string_literal =
Arc::new(Literal::new(ScalarValue::Utf8(Some(value_str.to_string()))));
// If the target type is already Utf8, return the string literal directly
if matches!(data_type, DataType::Utf8) {
return Ok(string_literal);
}
// Otherwise, wrap the string literal in a cast expression
let cast_expr = Arc::new(CastExpr::new(string_literal, data_type.clone(), None));
Ok(cast_expr)
}
}