Fix regression with Incorrect results when reading parquet files with different schemas and statistics (#8533)
* Add test for schema evolution
* Fix reading parquet statistics
* Update tests for fix
* Add comments to help explain the test
* Add another test
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 718f9f8..641b7bb 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -468,8 +468,10 @@
ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await?;
+ let file_schema = builder.schema().clone();
+
let (schema_mapping, adapted_projections) =
- schema_adapter.map_schema(builder.schema())?;
+ schema_adapter.map_schema(&file_schema)?;
// let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?;
let mask = ProjectionMask::roots(
@@ -481,8 +483,8 @@
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
&predicate,
- builder.schema().as_ref(),
- table_schema.as_ref(),
+ &file_schema,
+ &table_schema,
builder.metadata(),
reorder_predicates,
&file_metrics,
@@ -507,6 +509,7 @@
let file_metadata = builder.metadata().clone();
let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
let mut row_groups = row_groups::prune_row_groups_by_statistics(
+ &file_schema,
builder.parquet_schema(),
file_metadata.row_groups(),
file_range,
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 65414f5..7c3f7d9 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -55,6 +55,7 @@
/// Note: This method currently ignores ColumnOrder
/// <https://github.com/apache/arrow-datafusion/issues/8335>
pub(crate) fn prune_row_groups_by_statistics(
+ arrow_schema: &Schema,
parquet_schema: &SchemaDescriptor,
groups: &[RowGroupMetaData],
range: Option<FileRange>,
@@ -80,7 +81,7 @@
let pruning_stats = RowGroupPruningStatistics {
parquet_schema,
row_group_metadata: metadata,
- arrow_schema: predicate.schema().as_ref(),
+ arrow_schema,
};
match predicate.prune(&pruning_stats) {
Ok(values) => {
@@ -416,11 +417,11 @@
fn row_group_pruning_predicate_simple_expr() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
- let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
let expr = col("c1").gt(lit(15));
let expr = logical2physical(&expr, &schema);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
let schema_descr = get_test_schema_descr(vec![field]);
@@ -436,6 +437,7 @@
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
+ &schema,
&schema_descr,
&[rgm1, rgm2],
None,
@@ -450,11 +452,11 @@
fn row_group_pruning_predicate_missing_stats() {
use datafusion_expr::{col, lit};
// int > 1 => c1_max > 1
- let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+ let schema =
+ Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
let expr = col("c1").gt(lit(15));
let expr = logical2physical(&expr, &schema);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
let schema_descr = get_test_schema_descr(vec![field]);
@@ -471,6 +473,7 @@
// is null / undefined so the first row group can't be filtered out
assert_eq!(
prune_row_groups_by_statistics(
+ &schema,
&schema_descr,
&[rgm1, rgm2],
None,
@@ -519,6 +522,7 @@
// when conditions are joined using AND
assert_eq!(
prune_row_groups_by_statistics(
+ &schema,
&schema_descr,
groups,
None,
@@ -532,12 +536,13 @@
// this bypasses the entire predicate expression and no row groups are filtered out
let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
let expr = logical2physical(&expr, &schema);
- let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+ let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// if conditions in predicate are joined with OR and an unsupported expression is used
// this bypasses the entire predicate expression and no row groups are filtered out
assert_eq!(
prune_row_groups_by_statistics(
+ &schema,
&schema_descr,
groups,
None,
@@ -548,6 +553,64 @@
);
}
+ #[test]
+ fn row_group_pruning_predicate_file_schema() {
+ use datafusion_expr::{col, lit};
+ // test row group predicate when file schema is different than table schema
+ // c1 > 0
+ let table_schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ]));
+ let expr = col("c1").gt(lit(0));
+ let expr = logical2physical(&expr, &table_schema);
+ let pruning_predicate =
+ PruningPredicate::try_new(expr, table_schema.clone()).unwrap();
+
+ // Model a file schema's column order c2 then c1, which is the opposite
+ // of the table schema
+ let file_schema = Arc::new(Schema::new(vec![
+ Field::new("c2", DataType::Int32, false),
+ Field::new("c1", DataType::Int32, false),
+ ]));
+ let schema_descr = get_test_schema_descr(vec![
+ PrimitiveTypeField::new("c2", PhysicalType::INT32),
+ PrimitiveTypeField::new("c1", PhysicalType::INT32),
+ ]);
+ // rg1 has c2 less than zero, c1 greater than zero
+ let rgm1 = get_row_group_meta_data(
+ &schema_descr,
+ vec![
+ ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false), // c2
+ ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
+ ],
+ );
+ // rg1 has c2 greater than zero, c1 less than zero
+ let rgm2 = get_row_group_meta_data(
+ &schema_descr,
+ vec![
+ ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
+ ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false),
+ ],
+ );
+
+ let metrics = parquet_file_metrics();
+ let groups = &[rgm1, rgm2];
+ // the first row group should be left because c1 is greater than zero
+ // the second should be filtered out because c1 is less than zero
+ assert_eq!(
+ prune_row_groups_by_statistics(
+ &file_schema, // NB must be file schema, not table_schema
+ &schema_descr,
+ groups,
+ None,
+ Some(&pruning_predicate),
+ &metrics
+ ),
+ vec![0]
+ );
+ }
+
fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
let schema_descr = get_test_schema_descr(vec![
PrimitiveTypeField::new("c1", PhysicalType::INT32),
@@ -581,13 +644,14 @@
let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
let expr = logical2physical(&expr, &schema);
- let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+ let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();
let metrics = parquet_file_metrics();
// First row group was filtered out because it contains no null value on "c2".
assert_eq!(
prune_row_groups_by_statistics(
+ &schema,
&schema_descr,
&groups,
None,
@@ -613,7 +677,7 @@
.gt(lit(15))
.and(col("c2").eq(lit(ScalarValue::Boolean(None))));
let expr = logical2physical(&expr, &schema);
- let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+ let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let groups = gen_row_group_meta_data_for_pruning_predicate();
let metrics = parquet_file_metrics();
@@ -621,6 +685,7 @@
// pass predicates. Ideally these should both be false
assert_eq!(
prune_row_groups_by_statistics(
+ &schema,
&schema_descr,
&groups,
None,
@@ -639,8 +704,11 @@
// INT32: c1 > 5, the c1 is decimal(9,2)
// The type of scalar value if decimal(9,2), don't need to do cast
- let schema =
- Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "c1",
+ DataType::Decimal128(9, 2),
+ false,
+ )]));
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
.with_logical_type(LogicalType::Decimal {
scale: 2,
@@ -651,8 +719,7 @@
let schema_descr = get_test_schema_descr(vec![field]);
let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
let expr = logical2physical(&expr, &schema);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [1.00, 6.00]
@@ -680,6 +747,7 @@
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
+ &schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
@@ -693,8 +761,11 @@
// The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
// We should convert all type to the coercion type, which is decimal(11,2)
// The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
- let schema =
- Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "c1",
+ DataType::Decimal128(9, 0),
+ false,
+ )]));
let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
.with_logical_type(LogicalType::Decimal {
@@ -709,8 +780,7 @@
Decimal128(11, 2),
));
let expr = logical2physical(&expr, &schema);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [100, 600]
@@ -744,6 +814,7 @@
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
+ &schema,
&schema_descr,
&[rgm1, rgm2, rgm3, rgm4],
None,
@@ -754,8 +825,11 @@
);
// INT64: c1 < 5, the c1 is decimal(18,2)
- let schema =
- Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "c1",
+ DataType::Decimal128(18, 2),
+ false,
+ )]));
let field = PrimitiveTypeField::new("c1", PhysicalType::INT64)
.with_logical_type(LogicalType::Decimal {
scale: 2,
@@ -766,8 +840,7 @@
let schema_descr = get_test_schema_descr(vec![field]);
let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
let expr = logical2physical(&expr, &schema);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
let rgm1 = get_row_group_meta_data(
&schema_descr,
// [6.00, 8.00]
@@ -792,6 +865,7 @@
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
+ &schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
@@ -803,8 +877,11 @@
// FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
- let schema =
- Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "c1",
+ DataType::Decimal128(18, 2),
+ false,
+ )]));
let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY)
.with_logical_type(LogicalType::Decimal {
scale: 2,
@@ -818,8 +895,7 @@
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
@@ -863,6 +939,7 @@
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
+ &schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
@@ -874,8 +951,11 @@
// BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
// the type of parquet is decimal(18,2)
- let schema =
- Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "c1",
+ DataType::Decimal128(18, 2),
+ false,
+ )]));
let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY)
.with_logical_type(LogicalType::Decimal {
scale: 2,
@@ -889,8 +969,7 @@
let left = cast(col("c1"), DataType::Decimal128(28, 3));
let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
let expr = logical2physical(&expr, &schema);
- let pruning_predicate =
- PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+ let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
// we must use the big-endian when encode the i128 to bytes or vec[u8].
let rgm1 = get_row_group_meta_data(
&schema_descr,
@@ -923,6 +1002,7 @@
let metrics = parquet_file_metrics();
assert_eq!(
prune_row_groups_by_statistics(
+ &schema,
&schema_descr,
&[rgm1, rgm2, rgm3],
None,
diff --git a/datafusion/sqllogictest/test_files/schema_evolution.slt b/datafusion/sqllogictest/test_files/schema_evolution.slt
new file mode 100644
index 0000000..36d5415
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/schema_evolution.slt
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for schema evolution -- reading
+# data from different files with different schemas
+##########
+
+
+statement ok
+CREATE EXTERNAL TABLE parquet_table(a varchar, b int, c float) STORED AS PARQUET
+LOCATION 'test_files/scratch/schema_evolution/parquet_table/';
+
+# File1 has only columns a and b
+statement ok
+COPY (
+ SELECT column1 as a, column2 as b
+ FROM ( VALUES ('foo', 1), ('foo', 2), ('foo', 3) )
+ ) TO 'test_files/scratch/schema_evolution/parquet_table/1.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+
+# File2 has only b
+statement ok
+COPY (
+ SELECT column1 as b
+ FROM ( VALUES (10) )
+ ) TO 'test_files/scratch/schema_evolution/parquet_table/2.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+# File3 has a column from 'z' which does not appear in the table
+# but also values from a which do appear in the table
+statement ok
+COPY (
+ SELECT column1 as z, column2 as a
+ FROM ( VALUES ('bar', 'foo'), ('blarg', 'foo') )
+ ) TO 'test_files/scratch/schema_evolution/parquet_table/3.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+# File4 has data for b and a (reversed) and d
+statement ok
+COPY (
+ SELECT column1 as b, column2 as a, column3 as c
+ FROM ( VALUES (100, 'foo', 10.5), (200, 'foo', 12.6), (300, 'bzz', 13.7) )
+ ) TO 'test_files/scratch/schema_evolution/parquet_table/4.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+# The logical distribution of `a`, `b` and `c` in the files is like this:
+#
+## File1:
+# foo 1 NULL
+# foo 2 NULL
+# foo 3 NULL
+#
+## File2:
+# NULL 10 NULL
+#
+## File3:
+# foo NULL NULL
+# foo NULL NULL
+#
+## File4:
+# foo 100 10.5
+# foo 200 12.6
+# bzz 300 13.7
+
+# Show all the data
+query TIR rowsort
+select * from parquet_table;
+----
+NULL 10 NULL
+bzz 300 13.7
+foo 1 NULL
+foo 100 10.5
+foo 2 NULL
+foo 200 12.6
+foo 3 NULL
+foo NULL NULL
+foo NULL NULL
+
+# Should see all 7 rows that have 'a=foo'
+query TIR rowsort
+select * from parquet_table where a = 'foo';
+----
+foo 1 NULL
+foo 100 10.5
+foo 2 NULL
+foo 200 12.6
+foo 3 NULL
+foo NULL NULL
+foo NULL NULL
+
+query TIR rowsort
+select * from parquet_table where a != 'foo';
+----
+bzz 300 13.7
+
+# this should produce at least one row
+query TIR rowsort
+select * from parquet_table where a is NULL;
+----
+NULL 10 NULL
+
+query TIR rowsort
+select * from parquet_table where b > 5;
+----
+NULL 10 NULL
+bzz 300 13.7
+foo 100 10.5
+foo 200 12.6
+
+
+query TIR rowsort
+select * from parquet_table where b < 150;
+----
+NULL 10 NULL
+foo 1 NULL
+foo 100 10.5
+foo 2 NULL
+foo 3 NULL
+
+query TIR rowsort
+select * from parquet_table where c > 11.0;
+----
+bzz 300 13.7
+foo 200 12.6