Reduce clone of `Statistics` in `ListingTable` and `PartitionedFile` (#11802)
* reduce clone of `Statistics` by using arc.
* optimize `get_statistics_with_limit` and `split_files`.
* directly create the col stats set.
* fix pb.
* fix fmt.
* fix clippy.
* fix compile.
* remove stale codes.
* optimize `split_files` by using drain.
* remove default for PartitionedFile.
* don't keep `Arc<Statistic>` in `PartitionedFile`.
* fix pb.
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 29b593a..67af8ef 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -18,6 +18,7 @@
//! Helper functions for the table implementation
use std::collections::HashMap;
+use std::mem;
use std::sync::Arc;
use super::PartitionedFile;
@@ -138,10 +139,22 @@
// effectively this is div with rounding up instead of truncating
let chunk_size = (partitioned_files.len() + n - 1) / n;
- partitioned_files
- .chunks(chunk_size)
- .map(|c| c.to_vec())
- .collect()
+ let mut chunks = Vec::with_capacity(n);
+ let mut current_chunk = Vec::with_capacity(chunk_size);
+ for file in partitioned_files.drain(..) {
+ current_chunk.push(file);
+ if current_chunk.len() == chunk_size {
+ let full_chunk =
+ mem::replace(&mut current_chunk, Vec::with_capacity(chunk_size));
+ chunks.push(full_chunk);
+ }
+ }
+
+ if !current_chunk.is_empty() {
+ chunks.push(current_chunk)
+ }
+
+ chunks
}
struct Partition {
diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs
index 44f9276..21a6061 100644
--- a/datafusion/core/src/datasource/listing/mod.rs
+++ b/datafusion/core/src/datasource/listing/mod.rs
@@ -82,6 +82,7 @@
/// An optional field for user defined per object metadata
pub extensions: Option<Arc<dyn std::any::Any + Send + Sync>>,
}
+
impl PartitionedFile {
/// Create a simple file without metadata or partition
pub fn new(path: impl Into<String>, size: u64) -> Self {
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 80f49e4..bb86ac3 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -973,15 +973,16 @@
// collect the statistics if required by the config
let files = file_list
.map(|part_file| async {
- let mut part_file = part_file?;
+ let part_file = part_file?;
if self.options.collect_stat {
let statistics =
self.do_collect_statistics(ctx, &store, &part_file).await?;
- part_file.statistics = Some(statistics.clone());
- Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)>
+ Ok((part_file, statistics))
} else {
- Ok((part_file, Statistics::new_unknown(&self.file_schema)))
- as Result<(PartitionedFile, Statistics)>
+ Ok((
+ part_file,
+ Arc::new(Statistics::new_unknown(&self.file_schema)),
+ ))
}
})
.boxed()
@@ -1011,12 +1012,12 @@
ctx: &SessionState,
store: &Arc<dyn ObjectStore>,
part_file: &PartitionedFile,
- ) -> Result<Statistics> {
- let statistics_cache = self.collected_statistics.clone();
- return match statistics_cache
+ ) -> Result<Arc<Statistics>> {
+ match self
+ .collected_statistics
.get_with_extra(&part_file.object_meta.location, &part_file.object_meta)
{
- Some(statistics) => Ok(statistics.as_ref().clone()),
+ Some(statistics) => Ok(statistics.clone()),
None => {
let statistics = self
.options
@@ -1028,14 +1029,15 @@
&part_file.object_meta,
)
.await?;
- statistics_cache.put_with_extra(
+ let statistics = Arc::new(statistics);
+ self.collected_statistics.put_with_extra(
&part_file.object_meta.location,
- statistics.clone().into(),
+ statistics.clone(),
&part_file.object_meta,
);
Ok(statistics)
}
- };
+ }
}
}
diff --git a/datafusion/core/src/datasource/statistics.rs b/datafusion/core/src/datasource/statistics.rs
index 8c789e4..9d031a6 100644
--- a/datafusion/core/src/datasource/statistics.rs
+++ b/datafusion/core/src/datasource/statistics.rs
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+use std::mem;
+use std::sync::Arc;
+
use super::listing::PartitionedFile;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
@@ -26,8 +29,6 @@
use datafusion_common::ScalarValue;
use futures::{Stream, StreamExt};
-use itertools::izip;
-use itertools::multiunzip;
/// Get all files as well as the file level summary statistics (no statistic for partition columns).
/// If the optional `limit` is provided, includes only sufficient files. Needed to read up to
@@ -35,7 +36,7 @@
/// `ListingTable`. If it is false we only construct bare statistics and skip a potentially expensive
/// call to `multiunzip` for constructing file level summary statistics.
pub async fn get_statistics_with_limit(
- all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
+ all_files: impl Stream<Item = Result<(PartitionedFile, Arc<Statistics>)>>,
file_schema: SchemaRef,
limit: Option<usize>,
collect_stats: bool,
@@ -48,9 +49,7 @@
// - zero for summations, and
// - neutral element for extreme points.
let size = file_schema.fields().len();
- let mut null_counts: Vec<Precision<usize>> = vec![Precision::Absent; size];
- let mut max_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
- let mut min_values: Vec<Precision<ScalarValue>> = vec![Precision::Absent; size];
+ let mut col_stats_set = vec![ColumnStatistics::default(); size];
let mut num_rows = Precision::<usize>::Absent;
let mut total_byte_size = Precision::<usize>::Absent;
@@ -58,16 +57,19 @@
let mut all_files = Box::pin(all_files.fuse());
if let Some(first_file) = all_files.next().await {
- let (file, file_stats) = first_file?;
+ let (mut file, file_stats) = first_file?;
+ file.statistics = Some(file_stats.as_ref().clone());
result_files.push(file);
// First file, we set them directly from the file statistics.
- num_rows = file_stats.num_rows;
- total_byte_size = file_stats.total_byte_size;
- for (index, file_column) in file_stats.column_statistics.into_iter().enumerate() {
- null_counts[index] = file_column.null_count;
- max_values[index] = file_column.max_value;
- min_values[index] = file_column.min_value;
+ num_rows = file_stats.num_rows.clone();
+ total_byte_size = file_stats.total_byte_size.clone();
+ for (index, file_column) in
+ file_stats.column_statistics.clone().into_iter().enumerate()
+ {
+ col_stats_set[index].null_count = file_column.null_count;
+ col_stats_set[index].max_value = file_column.max_value;
+ col_stats_set[index].min_value = file_column.min_value;
}
// If the number of rows exceeds the limit, we can stop processing
@@ -80,7 +82,8 @@
};
if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
while let Some(current) = all_files.next().await {
- let (file, file_stats) = current?;
+ let (mut file, file_stats) = current?;
+ file.statistics = Some(file_stats.as_ref().clone());
result_files.push(file);
if !collect_stats {
continue;
@@ -90,38 +93,28 @@
// counts across all the files in question. If any file does not
// provide any information or provides an inexact value, we demote
// the statistic precision to inexact.
- num_rows = add_row_stats(file_stats.num_rows, num_rows);
+ num_rows = add_row_stats(file_stats.num_rows.clone(), num_rows);
total_byte_size =
- add_row_stats(file_stats.total_byte_size, total_byte_size);
+ add_row_stats(file_stats.total_byte_size.clone(), total_byte_size);
- (null_counts, max_values, min_values) = multiunzip(
- izip!(
- file_stats.column_statistics.into_iter(),
- null_counts.into_iter(),
- max_values.into_iter(),
- min_values.into_iter()
- )
- .map(
- |(
- ColumnStatistics {
- null_count: file_nc,
- max_value: file_max,
- min_value: file_min,
- distinct_count: _,
- },
- null_count,
- max_value,
- min_value,
- )| {
- (
- add_row_stats(file_nc, null_count),
- set_max_if_greater(file_max, max_value),
- set_min_if_lesser(file_min, min_value),
- )
- },
- ),
- );
+ for (file_col_stats, col_stats) in file_stats
+ .column_statistics
+ .iter()
+ .zip(col_stats_set.iter_mut())
+ {
+ let ColumnStatistics {
+ null_count: file_nc,
+ max_value: file_max,
+ min_value: file_min,
+ distinct_count: _,
+ } = file_col_stats;
+
+ col_stats.null_count =
+ add_row_stats(file_nc.clone(), col_stats.null_count.clone());
+ set_max_if_greater(file_max, &mut col_stats.max_value);
+ set_min_if_lesser(file_min, &mut col_stats.min_value)
+ }
// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
@@ -139,7 +132,7 @@
let mut statistics = Statistics {
num_rows,
total_byte_size,
- column_statistics: get_col_stats_vec(null_counts, max_values, min_values),
+ column_statistics: col_stats_set,
};
if all_files.next().await.is_some() {
// If we still have files in the stream, it means that the limit kicked
@@ -182,21 +175,6 @@
}
}
-pub(crate) fn get_col_stats_vec(
- null_counts: Vec<Precision<usize>>,
- max_values: Vec<Precision<ScalarValue>>,
- min_values: Vec<Precision<ScalarValue>>,
-) -> Vec<ColumnStatistics> {
- izip!(null_counts, max_values, min_values)
- .map(|(null_count, max_value, min_value)| ColumnStatistics {
- null_count,
- max_value,
- min_value,
- distinct_count: Precision::Absent,
- })
- .collect()
-}
-
pub(crate) fn get_col_stats(
schema: &Schema,
null_counts: Vec<Precision<usize>>,
@@ -238,45 +216,61 @@
/// If the given value is numerically greater than the original maximum value,
/// return the new maximum value with appropriate exactness information.
fn set_max_if_greater(
- max_nominee: Precision<ScalarValue>,
- max_values: Precision<ScalarValue>,
-) -> Precision<ScalarValue> {
- match (&max_values, &max_nominee) {
- (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => max_nominee,
+ max_nominee: &Precision<ScalarValue>,
+ max_value: &mut Precision<ScalarValue>,
+) {
+ match (&max_value, max_nominee) {
+ (Precision::Exact(val1), Precision::Exact(val2)) if val1 < val2 => {
+ *max_value = max_nominee.clone();
+ }
(Precision::Exact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Exact(val2))
if val1 < val2 =>
{
- max_nominee.to_inexact()
+ *max_value = max_nominee.clone().to_inexact();
}
- (Precision::Exact(_), Precision::Absent) => max_values.to_inexact(),
- (Precision::Absent, Precision::Exact(_)) => max_nominee.to_inexact(),
- (Precision::Absent, Precision::Inexact(_)) => max_nominee,
- (Precision::Absent, Precision::Absent) => Precision::Absent,
- _ => max_values,
+ (Precision::Exact(_), Precision::Absent) => {
+ let exact_max = mem::take(max_value);
+ *max_value = exact_max.to_inexact();
+ }
+ (Precision::Absent, Precision::Exact(_)) => {
+ *max_value = max_nominee.clone().to_inexact();
+ }
+ (Precision::Absent, Precision::Inexact(_)) => {
+ *max_value = max_nominee.clone();
+ }
+ _ => {}
}
}
/// If the given value is numerically lesser than the original minimum value,
/// return the new minimum value with appropriate exactness information.
fn set_min_if_lesser(
- min_nominee: Precision<ScalarValue>,
- min_values: Precision<ScalarValue>,
-) -> Precision<ScalarValue> {
- match (&min_values, &min_nominee) {
- (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => min_nominee,
+ min_nominee: &Precision<ScalarValue>,
+ min_value: &mut Precision<ScalarValue>,
+) {
+ match (&min_value, min_nominee) {
+ (Precision::Exact(val1), Precision::Exact(val2)) if val1 > val2 => {
+ *min_value = min_nominee.clone();
+ }
(Precision::Exact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Inexact(val2))
| (Precision::Inexact(val1), Precision::Exact(val2))
if val1 > val2 =>
{
- min_nominee.to_inexact()
+ *min_value = min_nominee.clone().to_inexact();
}
- (Precision::Exact(_), Precision::Absent) => min_values.to_inexact(),
- (Precision::Absent, Precision::Exact(_)) => min_nominee.to_inexact(),
- (Precision::Absent, Precision::Inexact(_)) => min_nominee,
- (Precision::Absent, Precision::Absent) => Precision::Absent,
- _ => min_values,
+ (Precision::Exact(_), Precision::Absent) => {
+ let exact_min = mem::take(min_value);
+ *min_value = exact_min.to_inexact();
+ }
+ (Precision::Absent, Precision::Exact(_)) => {
+ *min_value = min_nominee.clone().to_inexact();
+ }
+ (Precision::Absent, Precision::Inexact(_)) => {
+ *min_value = min_nominee.clone();
+ }
+ _ => {}
}
}