Fix regression with Incorrect results when reading parquet files with different schemas and statistics (#8533)

* Add test for schema evolution

* Fix reading parquet statistics

* Update tests for fix

* Add comments to help explain the test

* Add another test
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 718f9f8..641b7bb 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -468,8 +468,10 @@
                 ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
                     .await?;
 
+            let file_schema = builder.schema().clone();
+
             let (schema_mapping, adapted_projections) =
-                schema_adapter.map_schema(builder.schema())?;
+                schema_adapter.map_schema(&file_schema)?;
             // let predicate = predicate.map(|p| reassign_predicate_columns(p, builder.schema(), true)).transpose()?;
 
             let mask = ProjectionMask::roots(
@@ -481,8 +483,8 @@
             if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
                 let row_filter = row_filter::build_row_filter(
                     &predicate,
-                    builder.schema().as_ref(),
-                    table_schema.as_ref(),
+                    &file_schema,
+                    &table_schema,
                     builder.metadata(),
                     reorder_predicates,
                     &file_metrics,
@@ -507,6 +509,7 @@
             let file_metadata = builder.metadata().clone();
             let predicate = pruning_predicate.as_ref().map(|p| p.as_ref());
             let mut row_groups = row_groups::prune_row_groups_by_statistics(
+                &file_schema,
                 builder.parquet_schema(),
                 file_metadata.row_groups(),
                 file_range,
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index 65414f5..7c3f7d9 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -55,6 +55,7 @@
 /// Note: This method currently ignores ColumnOrder
 /// <https://github.com/apache/arrow-datafusion/issues/8335>
 pub(crate) fn prune_row_groups_by_statistics(
+    arrow_schema: &Schema,
     parquet_schema: &SchemaDescriptor,
     groups: &[RowGroupMetaData],
     range: Option<FileRange>,
@@ -80,7 +81,7 @@
             let pruning_stats = RowGroupPruningStatistics {
                 parquet_schema,
                 row_group_metadata: metadata,
-                arrow_schema: predicate.schema().as_ref(),
+                arrow_schema,
             };
             match predicate.prune(&pruning_stats) {
                 Ok(values) => {
@@ -416,11 +417,11 @@
     fn row_group_pruning_predicate_simple_expr() {
         use datafusion_expr::{col, lit};
         // int > 1 => c1_max > 1
-        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+        let schema =
+            Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
         let expr = col("c1").gt(lit(15));
         let expr = logical2physical(&expr, &schema);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
 
         let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
         let schema_descr = get_test_schema_descr(vec![field]);
@@ -436,6 +437,7 @@
         let metrics = parquet_file_metrics();
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema,
                 &schema_descr,
                 &[rgm1, rgm2],
                 None,
@@ -450,11 +452,11 @@
     fn row_group_pruning_predicate_missing_stats() {
         use datafusion_expr::{col, lit};
         // int > 1 => c1_max > 1
-        let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]);
+        let schema =
+            Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)]));
         let expr = col("c1").gt(lit(15));
         let expr = logical2physical(&expr, &schema);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
 
         let field = PrimitiveTypeField::new("c1", PhysicalType::INT32);
         let schema_descr = get_test_schema_descr(vec![field]);
@@ -471,6 +473,7 @@
         // is null / undefined so the first row group can't be filtered out
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema,
                 &schema_descr,
                 &[rgm1, rgm2],
                 None,
@@ -519,6 +522,7 @@
         // when conditions are joined using AND
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema,
                 &schema_descr,
                 groups,
                 None,
@@ -532,12 +536,13 @@
         // this bypasses the entire predicate expression and no row groups are filtered out
         let expr = col("c1").gt(lit(15)).or(col("c2").rem(lit(2)).eq(lit(0)));
         let expr = logical2physical(&expr, &schema);
-        let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
 
         // if conditions in predicate are joined with OR and an unsupported expression is used
         // this bypasses the entire predicate expression and no row groups are filtered out
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema,
                 &schema_descr,
                 groups,
                 None,
@@ -548,6 +553,64 @@
         );
     }
 
+    #[test]
+    fn row_group_pruning_predicate_file_schema() {
+        use datafusion_expr::{col, lit};
+        // test row group predicate when file schema is different than table schema
+        // c1 > 0
+        let table_schema = Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, false),
+            Field::new("c2", DataType::Int32, false),
+        ]));
+        let expr = col("c1").gt(lit(0));
+        let expr = logical2physical(&expr, &table_schema);
+        let pruning_predicate =
+            PruningPredicate::try_new(expr, table_schema.clone()).unwrap();
+
+        // Model a file schema's column order c2 then c1, which is the opposite
+        // of the table schema
+        let file_schema = Arc::new(Schema::new(vec![
+            Field::new("c2", DataType::Int32, false),
+            Field::new("c1", DataType::Int32, false),
+        ]));
+        let schema_descr = get_test_schema_descr(vec![
+            PrimitiveTypeField::new("c2", PhysicalType::INT32),
+            PrimitiveTypeField::new("c1", PhysicalType::INT32),
+        ]);
+        // rg1 has c2 less than zero, c1 greater than zero
+        let rgm1 = get_row_group_meta_data(
+            &schema_descr,
+            vec![
+                ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false), // c2
+                ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
+            ],
+        );
+        // rg1 has c2 greater than zero, c1 less than zero
+        let rgm2 = get_row_group_meta_data(
+            &schema_descr,
+            vec![
+                ParquetStatistics::int32(Some(1), Some(10), None, 0, false),
+                ParquetStatistics::int32(Some(-10), Some(-1), None, 0, false),
+            ],
+        );
+
+        let metrics = parquet_file_metrics();
+        let groups = &[rgm1, rgm2];
+        // the first row group should be left because c1 is greater than zero
+        // the second should be filtered out because c1 is less than zero
+        assert_eq!(
+            prune_row_groups_by_statistics(
+                &file_schema, // NB must be file schema, not table_schema
+                &schema_descr,
+                groups,
+                None,
+                Some(&pruning_predicate),
+                &metrics
+            ),
+            vec![0]
+        );
+    }
+
     fn gen_row_group_meta_data_for_pruning_predicate() -> Vec<RowGroupMetaData> {
         let schema_descr = get_test_schema_descr(vec![
             PrimitiveTypeField::new("c1", PhysicalType::INT32),
@@ -581,13 +644,14 @@
         let schema_descr = arrow_to_parquet_schema(&schema).unwrap();
         let expr = col("c1").gt(lit(15)).and(col("c2").is_null());
         let expr = logical2physical(&expr, &schema);
-        let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
         let groups = gen_row_group_meta_data_for_pruning_predicate();
 
         let metrics = parquet_file_metrics();
         // First row group was filtered out because it contains no null value on "c2".
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema,
                 &schema_descr,
                 &groups,
                 None,
@@ -613,7 +677,7 @@
             .gt(lit(15))
             .and(col("c2").eq(lit(ScalarValue::Boolean(None))));
         let expr = logical2physical(&expr, &schema);
-        let pruning_predicate = PruningPredicate::try_new(expr, schema).unwrap();
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
         let groups = gen_row_group_meta_data_for_pruning_predicate();
 
         let metrics = parquet_file_metrics();
@@ -621,6 +685,7 @@
         // pass predicates. Ideally these should both be false
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema,
                 &schema_descr,
                 &groups,
                 None,
@@ -639,8 +704,11 @@
 
         // INT32: c1 > 5, the c1 is decimal(9,2)
         // The type of scalar value if decimal(9,2), don't need to do cast
-        let schema =
-            Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 2), false)]);
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "c1",
+            DataType::Decimal128(9, 2),
+            false,
+        )]));
         let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
             .with_logical_type(LogicalType::Decimal {
                 scale: 2,
@@ -651,8 +719,7 @@
         let schema_descr = get_test_schema_descr(vec![field]);
         let expr = col("c1").gt(lit(ScalarValue::Decimal128(Some(500), 9, 2)));
         let expr = logical2physical(&expr, &schema);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
         let rgm1 = get_row_group_meta_data(
             &schema_descr,
             // [1.00, 6.00]
@@ -680,6 +747,7 @@
         let metrics = parquet_file_metrics();
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema,
                 &schema_descr,
                 &[rgm1, rgm2, rgm3],
                 None,
@@ -693,8 +761,11 @@
         // The c1 type is decimal(9,0) in the parquet file, and the type of scalar is decimal(5,2).
         // We should convert all type to the coercion type, which is decimal(11,2)
         // The decimal of arrow is decimal(5,2), the decimal of parquet is decimal(9,0)
-        let schema =
-            Schema::new(vec![Field::new("c1", DataType::Decimal128(9, 0), false)]);
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "c1",
+            DataType::Decimal128(9, 0),
+            false,
+        )]));
 
         let field = PrimitiveTypeField::new("c1", PhysicalType::INT32)
             .with_logical_type(LogicalType::Decimal {
@@ -709,8 +780,7 @@
             Decimal128(11, 2),
         ));
         let expr = logical2physical(&expr, &schema);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
         let rgm1 = get_row_group_meta_data(
             &schema_descr,
             // [100, 600]
@@ -744,6 +814,7 @@
         let metrics = parquet_file_metrics();
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema,
                 &schema_descr,
                 &[rgm1, rgm2, rgm3, rgm4],
                 None,
@@ -754,8 +825,11 @@
         );
 
         // INT64: c1 < 5, the c1 is decimal(18,2)
-        let schema =
-            Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "c1",
+            DataType::Decimal128(18, 2),
+            false,
+        )]));
         let field = PrimitiveTypeField::new("c1", PhysicalType::INT64)
             .with_logical_type(LogicalType::Decimal {
                 scale: 2,
@@ -766,8 +840,7 @@
         let schema_descr = get_test_schema_descr(vec![field]);
         let expr = col("c1").lt(lit(ScalarValue::Decimal128(Some(500), 18, 2)));
         let expr = logical2physical(&expr, &schema);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
         let rgm1 = get_row_group_meta_data(
             &schema_descr,
             // [6.00, 8.00]
@@ -792,6 +865,7 @@
         let metrics = parquet_file_metrics();
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema,
                 &schema_descr,
                 &[rgm1, rgm2, rgm3],
                 None,
@@ -803,8 +877,11 @@
 
         // FIXED_LENGTH_BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
         // the type of parquet is decimal(18,2)
-        let schema =
-            Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "c1",
+            DataType::Decimal128(18, 2),
+            false,
+        )]));
         let field = PrimitiveTypeField::new("c1", PhysicalType::FIXED_LEN_BYTE_ARRAY)
             .with_logical_type(LogicalType::Decimal {
                 scale: 2,
@@ -818,8 +895,7 @@
         let left = cast(col("c1"), DataType::Decimal128(28, 3));
         let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
         let expr = logical2physical(&expr, &schema);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
         // we must use the big-endian when encode the i128 to bytes or vec[u8].
         let rgm1 = get_row_group_meta_data(
             &schema_descr,
@@ -863,6 +939,7 @@
         let metrics = parquet_file_metrics();
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema,
                 &schema_descr,
                 &[rgm1, rgm2, rgm3],
                 None,
@@ -874,8 +951,11 @@
 
         // BYTE_ARRAY: c1 = decimal128(100000, 28, 3), the c1 is decimal(18,2)
         // the type of parquet is decimal(18,2)
-        let schema =
-            Schema::new(vec![Field::new("c1", DataType::Decimal128(18, 2), false)]);
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "c1",
+            DataType::Decimal128(18, 2),
+            false,
+        )]));
         let field = PrimitiveTypeField::new("c1", PhysicalType::BYTE_ARRAY)
             .with_logical_type(LogicalType::Decimal {
                 scale: 2,
@@ -889,8 +969,7 @@
         let left = cast(col("c1"), DataType::Decimal128(28, 3));
         let expr = left.eq(lit(ScalarValue::Decimal128(Some(100000), 28, 3)));
         let expr = logical2physical(&expr, &schema);
-        let pruning_predicate =
-            PruningPredicate::try_new(expr, Arc::new(schema)).unwrap();
+        let pruning_predicate = PruningPredicate::try_new(expr, schema.clone()).unwrap();
         // we must use the big-endian when encode the i128 to bytes or vec[u8].
         let rgm1 = get_row_group_meta_data(
             &schema_descr,
@@ -923,6 +1002,7 @@
         let metrics = parquet_file_metrics();
         assert_eq!(
             prune_row_groups_by_statistics(
+                &schema,
                 &schema_descr,
                 &[rgm1, rgm2, rgm3],
                 None,
diff --git a/datafusion/sqllogictest/test_files/schema_evolution.slt b/datafusion/sqllogictest/test_files/schema_evolution.slt
new file mode 100644
index 0000000..36d5415
--- /dev/null
+++ b/datafusion/sqllogictest/test_files/schema_evolution.slt
@@ -0,0 +1,140 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+# Tests for schema evolution -- reading
+# data from different files with different schemas
+##########
+
+
+statement ok
+CREATE EXTERNAL TABLE parquet_table(a varchar, b int, c float) STORED AS PARQUET
+LOCATION 'test_files/scratch/schema_evolution/parquet_table/';
+
+# File1 has only columns a and b
+statement ok
+COPY  (
+  SELECT column1 as a, column2 as b
+  FROM ( VALUES ('foo', 1), ('foo', 2), ('foo', 3) )
+ )  TO 'test_files/scratch/schema_evolution/parquet_table/1.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+
+# File2 has only b
+statement ok
+COPY  (
+  SELECT column1 as b
+  FROM ( VALUES (10) )
+ )  TO 'test_files/scratch/schema_evolution/parquet_table/2.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+# File3 has a column from 'z' which does not appear in the table
+# but also values from a which do appear in the table
+statement ok
+COPY  (
+  SELECT column1 as z, column2 as a
+    FROM ( VALUES ('bar', 'foo'), ('blarg', 'foo') )
+ )  TO 'test_files/scratch/schema_evolution/parquet_table/3.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+# File4 has data for b and a (reversed) and d
+statement ok
+COPY  (
+  SELECT column1 as b, column2 as a, column3 as c
+    FROM ( VALUES (100, 'foo', 10.5), (200, 'foo', 12.6), (300, 'bzz', 13.7) )
+ )  TO 'test_files/scratch/schema_evolution/parquet_table/4.parquet'
+(FORMAT PARQUET, SINGLE_FILE_OUTPUT true);
+
+# The logical distribution of `a`, `b` and `c` in the files is like this:
+#
+## File1:
+# foo 1 NULL
+# foo 2 NULL
+# foo 3 NULL
+#
+## File2:
+# NULL 10 NULL
+#
+## File3:
+# foo NULL NULL
+# foo NULL NULL
+#
+## File4:
+# foo 100 10.5
+# foo 200 12.6
+# bzz 300 13.7
+
+# Show all the data
+query TIR rowsort
+select * from parquet_table;
+----
+NULL 10 NULL
+bzz 300 13.7
+foo 1 NULL
+foo 100 10.5
+foo 2 NULL
+foo 200 12.6
+foo 3 NULL
+foo NULL NULL
+foo NULL NULL
+
+# Should see all 7 rows that have 'a=foo'
+query TIR rowsort
+select * from parquet_table where a = 'foo';
+----
+foo 1 NULL
+foo 100 10.5
+foo 2 NULL
+foo 200 12.6
+foo 3 NULL
+foo NULL NULL
+foo NULL NULL
+
+query TIR rowsort
+select * from parquet_table where a != 'foo';
+----
+bzz 300 13.7
+
+# this should produce at least one row
+query TIR rowsort
+select * from parquet_table where a is NULL;
+----
+NULL 10 NULL
+
+query TIR rowsort
+select * from parquet_table where b > 5;
+----
+NULL 10 NULL
+bzz 300 13.7
+foo 100 10.5
+foo 200 12.6
+
+
+query TIR rowsort
+select * from parquet_table where b < 150;
+----
+NULL 10 NULL
+foo 1 NULL
+foo 100 10.5
+foo 2 NULL
+foo 3 NULL
+
+query TIR rowsort
+select * from parquet_table where c > 11.0;
+----
+bzz 300 13.7
+foo 200 12.6