Reduce clone of `Statistics` in `ListingTable` and `PartitionedFile` (#11802)

* reduce clone of `Statistics` by using arc.

* optimize `get_statistics_with_limit` and `split_files`.

* directly create the col stats set.

* fix pb.

* fix fmt.

* fix clippy.

* fix compile.

* remove stale codes.

* optimize `split_files` by using drain.

* remove default for PartitionedFile.

* don't keep `Arc<Statistic>` in `PartitionedFile`.

* fix pb.
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 29b593a..67af8ef 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -18,6 +18,7 @@
 //! Helper functions for the table implementation
 
 use std::collections::HashMap;
+use std::mem;
 use std::sync::Arc;
 
 use super::PartitionedFile;
@@ -138,10 +139,22 @@
 
     // effectively this is div with rounding up instead of truncating
     let chunk_size = (partitioned_files.len() + n - 1) / n;
-    partitioned_files
-        .chunks(chunk_size)
-        .map(|c| c.to_vec())
-        .collect()
+    let mut chunks = Vec::with_capacity(n);
+    let mut current_chunk = Vec::with_capacity(chunk_size);
+    for file in partitioned_files.drain(..) {
+        current_chunk.push(file);
+        if current_chunk.len() == chunk_size {
+            let full_chunk =
+                mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
+            chunks.push(full_chunk);
+        }
+    }
+
+    if !current_chunk.is_empty() {
+        chunks.push(current_chunk)
+    }
+
+    chunks
 }
 
 struct Partition {
diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs
index 44f9276..21a6061 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -82,6 +82,7 @@
     /// An optional field for user defined per object metadata
     pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
 }
+
 impl PartitionedFile {
     /// Create a simple file without metadata or partition
     pub fn new(path: impl Into<String>, size: u64) -> Self {
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 80f49e4..bb86ac3 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -973,15 +973,16 @@
         // collect the statistics if required by the config
         let files = file_list
             .map(|part_file| async {
-                let mut part_file = part_file?;
+                let part_file = part_file?;
                 if self.options.collect_stat {
                     let statistics =
                         self.do_collect_statistics(ctx, &store, &part_file).await?;
-                    part_file.statistics = Some(statistics.clone());
-                    Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
+                    Ok((part_file, statistics))
                 } else {
-                    Ok((part_file, Statistics::new_unknown(&self.file_schema)))
-                        as Result<(PartitionedFile, Statistics)>
+                    Ok((
+                        part_file,
+                        Arc::new(Statistics::new_unknown(&self.file_schema)),
+                    ))
                 }
             })
             .boxed()
@@ -1011,12 +1012,12 @@
         ctx: &SessionState,
         store: &Arc<dyn ObjectStore>,
         part_file: &PartitionedFile,
-    ) -> Result<Statistics> {
-        let statistics_cache = self.collected_statistics.clone();
-        return match statistics_cache
+    ) -> Result<Arc<Statistics>> {
+        match self
+            .collected_statistics
             .get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
         {
-            Some(statistics) => Ok(statistics.as_ref().clone()),
+            Some(statistics) => Ok(statistics.clone()),
             None => {
                 let statistics = self
                     .options
@@ -1028,14 +1029,15 @@
                         &part_file.object_meta,
                     )
                     .await?;
-                statistics_cache.put_with_extra(
+                let statistics = Arc::new(statistics);
+                self.collected_statistics.put_with_extra(
                     &part_file.object_meta.location,
-                    statistics.clone().into(),
+                    statistics.clone(),
                     &part_file.object_meta,
                 );
                 Ok(statistics)
             }
-        };
+        }
     }
 }
 
diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs
index 8c789e4..9d031a6 100644
--- a/datafusion/core/src/datasource/statistics.rs
+++ b/datafusion/core/src/datasource/statistics.rs
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::mem;
+use std::sync::Arc;
+
 use super::listing::PartitionedFile;
 use crate::arrow::datatypes::{Schema, SchemaRef};
 use crate::error::Result;
@@ -26,8 +29,6 @@
 use datafusion_common::ScalarValue;
 
 use futures::{Stream, StreamExt};
-use itertools::izip;
-use itertools::multiunzip;
 
 /// Get all files as well as the file level summary statistics (no statistic for partition columns).
 /// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
@@ -35,7 +36,7 @@
 /// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive
 ///  call to `multiunzip` for constructing file level summary statistics.
 pub async fn get_statistics_with_limit(
-    all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
+    all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
     file_schema: SchemaRef,
     limit: Option<usize>,
     collect_stats: bool,
@@ -48,9 +49,7 @@
     // - zero for summations, and
     // - neutral element for extreme points.
     let size = file_schema.fields().len();
-    let mut null_counts: Vec<Precision<usize>> = vec![Precision::Absent; size];
-    let mut max_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
-    let mut min_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
+    let mut col_stats_set = vec![ColumnStatistics::default(); size];
     let mut num_rows = Precision::<usize>::Absent;
     let mut total_byte_size = Precision::<usize>::Absent;
 
@@ -58,16 +57,19 @@
     let mut all_files = Box::pin(all_files.fuse());
 
     if let Some(first_file) = all_files.next().await {
-        let (file, file_stats) = first_file?;
+        let (mut file, file_stats) = first_file?;
+        file.statistics = Some(file_stats.as_ref().clone());
         result_files.push(file);
 
         // First file, we set them directly from the file statistics.
-        num_rows = file_stats.num_rows;
-        total_byte_size = file_stats.total_byte_size;
-        for (index, file_column) in file_stats.column_statistics.into_iter().enumerate() {
-            null_counts[index] = file_column.null_count;
-            max_values[index] = file_column.max_value;
-            min_values[index] = file_column.min_value;
+        num_rows = file_stats.num_rows.clone();
+        total_byte_size = file_stats.total_byte_size.clone();
+        for (index, file_column) in
+            file_stats.column_statistics.clone().into_iter().enumerate()
+        {
+            col_stats_set[index].null_count = file_column.null_count;
+            col_stats_set[index].max_value = file_column.max_value;
+            col_stats_set[index].min_value = file_column.min_value;
         }
 
         // If the number of rows exceeds the limit, we can stop processing
@@ -80,7 +82,8 @@
         };
         if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
             while let Some(current) = all_files.next().await {
-                let (file, file_stats) = current?;
+                let (mut file, file_stats) = current?;
+                file.statistics = Some(file_stats.as_ref().clone());
                 result_files.push(file);
                 if !collect_stats {
                     continue;
@@ -90,38 +93,28 @@
                 // counts across all the files in question. If any file does not
                 // provide any information or provides an inexact value, we demote
                 // the statistic precision to inexact.
-                num_rows = add_row_stats(file_stats.num_rows, num_rows);
+                num_rows = add_row_stats(file_stats.num_rows.clone(), num_rows);
 
                 total_byte_size =
-                    add_row_stats(file_stats.total_byte_size, total_byte_size);
+                    add_row_stats(file_stats.total_byte_size.clone(), total_byte_size);
 
-                (null_counts, max_values, min_values) = multiunzip(
-                    izip!(
-                        file_stats.column_statistics.into_iter(),
-                        null_counts.into_iter(),
-                        max_values.into_iter(),
-                        min_values.into_iter()
-                    )
-                    .map(
-                        |(
-                            ColumnStatistics {
-                                null_count: file_nc,
-                                max_value: file_max,
-                                min_value: file_min,
-                                distinct_count: _,
-                            },
-                            null_count,
-                            max_value,
-                            min_value,
-                        )| {
-                            (
-                                add_row_stats(file_nc, null_count),
-                                set_max_if_greater(file_max, max_value),
-                                set_min_if_lesser(file_min, min_value),
-                            )
-                        },
-                    ),
-                );
+                for (file_col_stats, col_stats) in file_stats
+                    .column_statistics
+                    .iter()
+                    .zip(col_stats_set.iter_mut())
+                {
+                    let ColumnStatistics {
+                        null_count: file_nc,
+                        max_value: file_max,
+                        min_value: file_min,
+                        distinct_count: _,
+                    } = file_col_stats;
+
+                    col_stats.null_count =
+                        add_row_stats(file_nc.clone(), col_stats.null_count.clone());
+                    set_max_if_greater(file_max, &mut col_stats.max_value);
+                    set_min_if_lesser(file_min, &mut col_stats.min_value)
+                }
 
                 // If the number of rows exceeds the limit, we can stop processing
                 // files. This only applies when we know the number of rows. It also
@@ -139,7 +132,7 @@
     let mut statistics = Statistics {
         num_rows,
         total_byte_size,
-        column_statistics: get_col_stats_vec(null_counts, max_values, min_values),
+        column_statistics: col_stats_set,
     };
     if all_files.next().await.is_some() {
         // If we still have files in the stream, it means that the limit kicked
@@ -182,21 +175,6 @@
     }
 }
 
-pub(crate) fn get_col_stats_vec(
-    null_counts: Vec<Precision<usize>>,
-    max_values: Vec<Precision<ScalarValue>>,
-    min_values: Vec<Precision<ScalarValue>>,
-) -> Vec<ColumnStatistics> {
-    izip!(null_counts, max_values, min_values)
-        .map(|(null_count, max_value, min_value)| ColumnStatistics {
-            null_count,
-            max_value,
-            min_value,
-            distinct_count: Precision::Absent,
-        })
-        .collect()
-}
-
 pub(crate) fn get_col_stats(
     schema: &Schema,
     null_counts: Vec<Precision<usize>>,
@@ -238,45 +216,61 @@
 /// If the given value is numerically greater than the original maximum value,
 /// return the new maximum value with appropriate exactness information.
 fn set_max_if_greater(
-    max_nominee: Precision<ScalarValue>,
-    max_values: Precision<ScalarValue>,
-) -> Precision<ScalarValue> {
-    match (&max_values, &max_nominee) {
-        (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => max_nominee,
+    max_nominee: &Precision<ScalarValue>,
+    max_value: &mut Precision<ScalarValue>,
+) {
+    match (&max_value, max_nominee) {
+        (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => {
+            *max_value = max_nominee.clone();
+        }
         (Precision::Exact(val1), Precision::Inexact(val2))
         | (Precision::Inexact(val1), Precision::Inexact(val2))
         | (Precision::Inexact(val1), Precision::Exact(val2))
             if val1 < val2 =>
         {
-            max_nominee.to_inexact()
+            *max_value = max_nominee.clone().to_inexact();
         }
-        (Precision::Exact(_), Precision::Absent) => max_values.to_inexact(),
-        (Precision::Absent, Precision::Exact(_)) => max_nominee.to_inexact(),
-        (Precision::Absent, Precision::Inexact(_)) => max_nominee,
-        (Precision::Absent, Precision::Absent) => Precision::Absent,
-        _ => max_values,
+        (Precision::Exact(_), Precision::Absent) => {
+            let exact_max = mem::take(max_value);
+            *max_value = exact_max.to_inexact();
+        }
+        (Precision::Absent, Precision::Exact(_)) => {
+            *max_value = max_nominee.clone().to_inexact();
+        }
+        (Precision::Absent, Precision::Inexact(_)) => {
+            *max_value = max_nominee.clone();
+        }
+        _ => {}
     }
 }
 
 /// If the given value is numerically lesser than the original minimum value,
 /// return the new minimum value with appropriate exactness information.
 fn set_min_if_lesser(
-    min_nominee: Precision<ScalarValue>,
-    min_values: Precision<ScalarValue>,
-) -> Precision<ScalarValue> {
-    match (&min_values, &min_nominee) {
-        (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => min_nominee,
+    min_nominee: &Precision<ScalarValue>,
+    min_value: &mut Precision<ScalarValue>,
+) {
+    match (&min_value, min_nominee) {
+        (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => {
+            *min_value = min_nominee.clone();
+        }
         (Precision::Exact(val1), Precision::Inexact(val2))
         | (Precision::Inexact(val1), Precision::Inexact(val2))
         | (Precision::Inexact(val1), Precision::Exact(val2))
             if val1 > val2 =>
         {
-            min_nominee.to_inexact()
+            *min_value = min_nominee.clone().to_inexact();
         }
-        (Precision::Exact(_), Precision::Absent) => min_values.to_inexact(),
-        (Precision::Absent, Precision::Exact(_)) => min_nominee.to_inexact(),
-        (Precision::Absent, Precision::Inexact(_)) => min_nominee,
-        (Precision::Absent, Precision::Absent) => Precision::Absent,
-        _ => min_values,
+        (Precision::Exact(_), Precision::Absent) => {
+            let exact_min = mem::take(min_value);
+            *min_value = exact_min.to_inexact();
+        }
+        (Precision::Absent, Precision::Exact(_)) => {
+            *min_value = min_nominee.clone().to_inexact();
+        }
+        (Precision::Absent, Precision::Inexact(_)) => {
+            *min_value = min_nominee.clone();
+        }
+        _ => {}
     }
 }