blob: d7171542d51861cc7de71a5dba179a015617fd4d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! See `main.rs` for how to run it.
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use arrow::array::RecordBatch;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::assert_batches_eq;
use datafusion::catalog::memory::DataSourceExec;
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::DFSchema;
use datafusion::common::{Result, ScalarValue};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::execution::context::SessionContext;
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_expr::PhysicalExpr;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{SessionConfig, lit};
use datafusion_physical_expr_adapter::{
DefaultPhysicalExprAdapterFactory, PhysicalExprAdapter, PhysicalExprAdapterFactory,
replace_columns_with_literals,
};
use futures::StreamExt;
use object_store::memory::InMemory;
use object_store::path::Path;
use object_store::{ObjectStore, PutPayload};
// Metadata key for storing default values in field metadata
const DEFAULT_VALUE_METADATA_KEY: &str = "example.default_value";
/// Example showing how to implement custom default value handling for missing columns
/// using field metadata and PhysicalExprAdapter.
///
/// This example demonstrates how to:
/// 1. Store default values in field metadata using a constant key
/// 2. Create a custom PhysicalExprAdapter that reads these defaults
/// 3. Inject default values for missing columns in filter predicates using `replace_columns_with_literals`
/// 4. Use the DefaultPhysicalExprAdapter as a fallback for standard schema adaptation
/// 5. Convert string default values to proper types using `ScalarValue::cast_to()` at planning time
///
/// Important: PhysicalExprAdapter handles rewriting both filter predicates and projection
/// expressions for file scans, including handling missing columns.
///
/// The metadata-based approach provides a flexible way to store default values as strings
/// and cast them to the appropriate types at planning time, avoiding runtime overhead.
pub async fn default_column_values() -> Result<()> {
println!("=== Creating example data with missing columns and default values ===");
// Create sample data where the logical schema has more columns than the physical schema
let (logical_schema, physical_schema, batch) = create_sample_data_with_defaults();
let store = InMemory::new();
let buf = {
let mut buf = vec![];
let props = WriterProperties::builder()
.set_max_row_group_size(2)
.build();
let mut writer =
ArrowWriter::try_new(&mut buf, physical_schema.clone(), Some(props))?;
writer.write(&batch)?;
writer.close()?;
buf
};
let path = Path::from("example.parquet");
let payload = PutPayload::from_bytes(buf.into());
store.put(&path, payload).await?;
// Create a custom table provider that handles missing columns with defaults
let table_provider = Arc::new(DefaultValueTableProvider::new(logical_schema));
// Set up query execution
let mut cfg = SessionConfig::new();
cfg.options_mut().execution.parquet.pushdown_filters = true;
let ctx = SessionContext::new_with_config(cfg);
// Register our table
ctx.register_table("example_table", table_provider)?;
ctx.runtime_env().register_object_store(
ObjectStoreUrl::parse("memory://")?.as_ref(),
Arc::new(store),
);
println!("\n=== Demonstrating default value injection in filter predicates ===");
let query = "SELECT id, name FROM example_table WHERE status = 'active' ORDER BY id";
println!("Query: {query}");
println!("Note: The 'status' column doesn't exist in the physical schema,");
println!(
"but our adapter injects the default value 'active' for the filter predicate."
);
let batches = ctx.sql(query).await?.collect().await?;
#[rustfmt::skip]
let expected = [
"+----+-------+",
"| id | name |",
"+----+-------+",
"| 1 | Alice |",
"| 2 | Bob |",
"| 3 | Carol |",
"+----+-------+",
];
arrow::util::pretty::print_batches(&batches)?;
assert_batches_eq!(expected, &batches);
println!("\n=== Key Insight ===");
println!("This example demonstrates how PhysicalExprAdapter works:");
println!("1. Physical schema only has 'id' and 'name' columns");
println!(
"2. Logical schema has 'id', 'name', 'status', and 'priority' columns with defaults"
);
println!(
"3. Our custom adapter uses replace_columns_with_literals to inject default values"
);
println!("4. Default values from metadata are cast to proper types at planning time");
println!("5. The DefaultPhysicalExprAdapter handles other schema adaptations");
Ok(())
}
/// Create sample data with a logical schema that has default values in metadata
/// and a physical schema that's missing some columns
fn create_sample_data_with_defaults() -> (SchemaRef, SchemaRef, RecordBatch) {
// Create metadata for default values
let mut status_metadata = HashMap::new();
status_metadata.insert(DEFAULT_VALUE_METADATA_KEY.to_string(), "active".to_string());
let mut priority_metadata = HashMap::new();
priority_metadata.insert(DEFAULT_VALUE_METADATA_KEY.to_string(), "1".to_string());
// The logical schema includes all columns with their default values in metadata
// Note: We make the columns with defaults nullable to allow the default adapter to handle them
let logical_schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
Field::new("status", DataType::Utf8, true).with_metadata(status_metadata),
Field::new("priority", DataType::Int32, true).with_metadata(priority_metadata),
]);
// The physical schema only has some columns (simulating missing columns in storage)
let physical_schema = Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]);
// Create sample data for the physical schema
let batch = RecordBatch::try_new(
Arc::new(physical_schema.clone()),
vec![
Arc::new(arrow::array::Int32Array::from(vec![1, 2, 3])),
Arc::new(arrow::array::StringArray::from(vec![
"Alice", "Bob", "Carol",
])),
],
)
.unwrap();
(Arc::new(logical_schema), Arc::new(physical_schema), batch)
}
/// Custom TableProvider that uses DefaultValuePhysicalExprAdapter
#[derive(Debug)]
struct DefaultValueTableProvider {
schema: SchemaRef,
}
impl DefaultValueTableProvider {
fn new(schema: SchemaRef) -> Self {
Self { schema }
}
}
#[async_trait]
impl TableProvider for DefaultValueTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn table_type(&self) -> TableType {
TableType::Base
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
let schema = Arc::clone(&self.schema);
let df_schema = DFSchema::try_from(schema.clone())?;
let filter = state.create_physical_expr(
conjunction(filters.iter().cloned()).unwrap_or_else(|| lit(true)),
&df_schema,
)?;
let parquet_source = ParquetSource::new(schema.clone())
.with_predicate(filter)
.with_pushdown_filters(true);
let object_store_url = ObjectStoreUrl::parse("memory://")?;
let store = state.runtime_env().object_store(object_store_url)?;
let mut files = vec![];
let mut listing = store.list(None);
while let Some(file) = listing.next().await {
if let Ok(file) = file {
files.push(file);
}
}
let file_group = files
.iter()
.map(|file| PartitionedFile::new(file.location.clone(), file.size))
.collect();
let file_scan_config = FileScanConfigBuilder::new(
ObjectStoreUrl::parse("memory://")?,
Arc::new(parquet_source),
)
.with_projection_indices(projection.cloned())?
.with_limit(limit)
.with_file_group(file_group)
.with_expr_adapter(Some(Arc::new(DefaultValuePhysicalExprAdapterFactory) as _));
Ok(Arc::new(DataSourceExec::new(Arc::new(
file_scan_config.build(),
))))
}
}
/// Factory for creating DefaultValuePhysicalExprAdapter instances
#[derive(Debug)]
struct DefaultValuePhysicalExprAdapterFactory;
impl PhysicalExprAdapterFactory for DefaultValuePhysicalExprAdapterFactory {
fn create(
&self,
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
) -> Result<Arc<dyn PhysicalExprAdapter>> {
let default_factory = DefaultPhysicalExprAdapterFactory;
let default_adapter = default_factory.create(
Arc::clone(&logical_file_schema),
Arc::clone(&physical_file_schema),
)?;
Ok(Arc::new(DefaultValuePhysicalExprAdapter {
logical_file_schema,
physical_file_schema,
default_adapter,
}))
}
}
/// Custom PhysicalExprAdapter that handles missing columns with default values from metadata
/// and wraps DefaultPhysicalExprAdapter for standard schema adaptation
#[derive(Debug)]
struct DefaultValuePhysicalExprAdapter {
logical_file_schema: SchemaRef,
physical_file_schema: SchemaRef,
default_adapter: Arc<dyn PhysicalExprAdapter>,
}
impl PhysicalExprAdapter for DefaultValuePhysicalExprAdapter {
fn rewrite(&self, expr: Arc<dyn PhysicalExpr>) -> Result<Arc<dyn PhysicalExpr>> {
// Pre-compute replacements for missing columns with default values
let mut replacements = HashMap::new();
for field in self.logical_file_schema.fields() {
// Skip columns that exist in physical schema
if self.physical_file_schema.index_of(field.name()).is_ok() {
continue;
}
// Check if this missing column has a default value in metadata
if let Some(default_str) = field.metadata().get(DEFAULT_VALUE_METADATA_KEY) {
// Create a Utf8 ScalarValue from the string and cast it to the target type
let string_value = ScalarValue::Utf8(Some(default_str.to_string()));
let typed_value = string_value.cast_to(field.data_type())?;
replacements.insert(field.name().as_str(), typed_value);
}
}
// Replace columns with their default literals if any
let rewritten = if !replacements.is_empty() {
let refs: HashMap<_, _> = replacements.iter().map(|(k, v)| (*k, v)).collect();
replace_columns_with_literals(expr, &refs)?
} else {
expr
};
// Apply the default adapter as a fallback for other schema adaptations
self.default_adapter.rewrite(rewritten)
}
}