| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Execution plan for reading Parquet files |
| |
| use std::fmt; |
| use std::fs::File; |
| use std::sync::Arc; |
| use std::task::{Context, Poll}; |
| use std::{ |
| any::Any, |
| collections::{HashMap, HashSet}, |
| }; |
| |
| use super::{ |
| planner::DefaultPhysicalPlanner, ColumnarValue, PhysicalExpr, RecordBatchStream, |
| SendableRecordBatchStream, |
| }; |
| use crate::physical_plan::{common, ExecutionPlan, Partitioning}; |
| use crate::{ |
| error::{DataFusionError, Result}, |
| execution::context::ExecutionContextState, |
| logical_plan::{Expr, Operator}, |
| optimizer::utils, |
| prelude::ExecutionConfig, |
| }; |
| use arrow::record_batch::RecordBatch; |
| use arrow::{ |
| array::new_null_array, |
| error::{ArrowError, Result as ArrowResult}, |
| }; |
| use arrow::{ |
| array::{make_array, ArrayData, ArrayRef, BooleanArray, BooleanBufferBuilder}, |
| buffer::MutableBuffer, |
| datatypes::{DataType, Field, Schema, SchemaRef}, |
| }; |
| use parquet::file::{ |
| metadata::RowGroupMetaData, |
| reader::{FileReader, SerializedFileReader}, |
| statistics::Statistics as ParquetStatistics, |
| }; |
| |
| use fmt::Debug; |
| use parquet::arrow::{ArrowReader, ParquetFileArrowReader}; |
| use tokio::{ |
| sync::mpsc::{channel, Receiver, Sender}, |
| task, |
| }; |
| use tokio_stream::wrappers::ReceiverStream; |
| |
| use crate::datasource::datasource::Statistics; |
| use async_trait::async_trait; |
| use futures::stream::{Stream, StreamExt}; |
| |
| /// Execution plan for scanning one or more Parquet partitions |
| #[derive(Debug, Clone)] |
| pub struct ParquetExec { |
| /// Parquet partitions to read |
| partitions: Vec<ParquetPartition>, |
| /// Schema after projection is applied |
| schema: SchemaRef, |
| /// Projection for which columns to load |
| projection: Vec<usize>, |
| /// Batch size |
| batch_size: usize, |
| /// Statistics for the data set (sum of statistics for all partitions) |
| statistics: Statistics, |
| /// Optional predicate builder |
| predicate_builder: Option<RowGroupPredicateBuilder>, |
| } |
| |
| /// Represents one partition of a Parquet data set and this currently means one Parquet file. |
| /// |
| /// In the future it would be good to support subsets of files based on ranges of row groups |
| /// so that we can better parallelize reads of large files across available cores (see |
| /// [ARROW-10995](https://issues.apache.org/jira/browse/ARROW-10995)). |
| /// |
| /// We may also want to support reading Parquet files that are partitioned based on a key and |
| /// in this case we would want this partition struct to represent multiple files for a given |
| /// partition key (see [ARROW-11019](https://issues.apache.org/jira/browse/ARROW-11019)). |
| #[derive(Debug, Clone)] |
| pub struct ParquetPartition { |
| /// The Parquet filename for this partition |
| filenames: Vec<String>, |
| /// Statistics for this partition |
| statistics: Statistics, |
| } |
| |
| impl ParquetExec { |
| /// Create a new Parquet reader execution plan based on the specified Parquet filename or |
| /// directory containing Parquet files |
| pub fn try_from_path( |
| path: &str, |
| projection: Option<Vec<usize>>, |
| predicate: Option<Expr>, |
| batch_size: usize, |
| max_concurrency: usize, |
| ) -> Result<Self> { |
| // build a list of filenames from the specified path, which could be a single file or |
| // a directory containing one or more parquet files |
| let mut filenames: Vec<String> = vec![]; |
| common::build_file_list(path, &mut filenames, ".parquet")?; |
| if filenames.is_empty() { |
| Err(DataFusionError::Plan(format!( |
| "No Parquet files found at path {}", |
| path |
| ))) |
| } else { |
| let filenames = filenames |
| .iter() |
| .map(|filename| filename.as_str()) |
| .collect::<Vec<&str>>(); |
| Self::try_from_files( |
| &filenames, |
| projection, |
| predicate, |
| batch_size, |
| max_concurrency, |
| ) |
| } |
| } |
| |
| /// Create a new Parquet reader execution plan based on the specified list of Parquet |
| /// files |
| pub fn try_from_files( |
| filenames: &[&str], |
| projection: Option<Vec<usize>>, |
| predicate: Option<Expr>, |
| batch_size: usize, |
| max_concurrency: usize, |
| ) -> Result<Self> { |
| // build a list of Parquet partitions with statistics and gather all unique schemas |
| // used in this data set |
| let mut schemas: Vec<Schema> = vec![]; |
| let mut partitions = Vec::with_capacity(max_concurrency); |
| let filenames: Vec<String> = filenames.iter().map(|s| s.to_string()).collect(); |
| let chunks = split_files(&filenames, max_concurrency); |
| let mut num_rows = 0; |
| let mut total_byte_size = 0; |
| for chunk in chunks { |
| let filenames: Vec<String> = chunk.iter().map(|x| x.to_string()).collect(); |
| for filename in &filenames { |
| let file = File::open(filename)?; |
| let file_reader = Arc::new(SerializedFileReader::new(file)?); |
| let mut arrow_reader = ParquetFileArrowReader::new(file_reader); |
| let meta_data = arrow_reader.get_metadata(); |
| // collect all the unique schemas in this data set |
| let schema = arrow_reader.get_schema()?; |
| if schemas.is_empty() || schema != schemas[0] { |
| schemas.push(schema); |
| } |
| for i in 0..meta_data.num_row_groups() { |
| let row_group_meta = meta_data.row_group(i); |
| num_rows += row_group_meta.num_rows(); |
| total_byte_size += row_group_meta.total_byte_size(); |
| } |
| } |
| let statistics = Statistics { |
| num_rows: Some(num_rows as usize), |
| total_byte_size: Some(total_byte_size as usize), |
| column_statistics: None, |
| }; |
| partitions.push(ParquetPartition { |
| filenames, |
| statistics, |
| }); |
| } |
| |
| // we currently get the schema information from the first file rather than do |
| // schema merging and this is a limitation. |
| // See https://issues.apache.org/jira/browse/ARROW-11017 |
| if schemas.len() > 1 { |
| return Err(DataFusionError::Plan(format!( |
| "The Parquet files have {} different schemas and DataFusion does \ |
| not yet support schema merging", |
| schemas.len() |
| ))); |
| } |
| let schema = schemas[0].clone(); |
| let predicate_builder = predicate.and_then(|predicate_expr| { |
| RowGroupPredicateBuilder::try_new(&predicate_expr, schema.clone()).ok() |
| }); |
| |
| Ok(Self::new( |
| partitions, |
| schema, |
| projection, |
| predicate_builder, |
| batch_size, |
| )) |
| } |
| |
| /// Create a new Parquet reader execution plan with provided partitions and schema |
| pub fn new( |
| partitions: Vec<ParquetPartition>, |
| schema: Schema, |
| projection: Option<Vec<usize>>, |
| predicate_builder: Option<RowGroupPredicateBuilder>, |
| batch_size: usize, |
| ) -> Self { |
| let projection = match projection { |
| Some(p) => p, |
| None => (0..schema.fields().len()).collect(), |
| }; |
| |
| let projected_schema = Schema::new( |
| projection |
| .iter() |
| .map(|i| schema.field(*i).clone()) |
| .collect(), |
| ); |
| |
| // sum the statistics |
| let mut num_rows: Option<usize> = None; |
| let mut total_byte_size: Option<usize> = None; |
| for part in &partitions { |
| if let Some(n) = part.statistics.num_rows { |
| num_rows = Some(num_rows.unwrap_or(0) + n) |
| } |
| if let Some(n) = part.statistics.total_byte_size { |
| total_byte_size = Some(total_byte_size.unwrap_or(0) + n) |
| } |
| } |
| let statistics = Statistics { |
| num_rows, |
| total_byte_size, |
| column_statistics: None, |
| }; |
| Self { |
| partitions, |
| schema: Arc::new(projected_schema), |
| projection, |
| predicate_builder, |
| batch_size, |
| statistics, |
| } |
| } |
| |
| /// Parquet partitions to read |
| pub fn partitions(&self) -> &[ParquetPartition] { |
| &self.partitions |
| } |
| |
| /// Projection for which columns to load |
| pub fn projection(&self) -> &[usize] { |
| &self.projection |
| } |
| |
| /// Batch size |
| pub fn batch_size(&self) -> usize { |
| self.batch_size |
| } |
| |
| /// Statistics for the data set (sum of statistics for all partitions) |
| pub fn statistics(&self) -> &Statistics { |
| &self.statistics |
| } |
| } |
| |
| impl ParquetPartition { |
| /// Create a new parquet partition |
| pub fn new(filenames: Vec<String>, statistics: Statistics) -> Self { |
| Self { |
| filenames, |
| statistics, |
| } |
| } |
| |
| /// The Parquet filename for this partition |
| pub fn filenames(&self) -> &[String] { |
| &self.filenames |
| } |
| |
| /// Statistics for this partition |
| pub fn statistics(&self) -> &Statistics { |
| &self.statistics |
| } |
| } |
| |
| #[derive(Debug, Clone)] |
| /// Predicate builder used for generating of predicate functions, used to filter row group metadata |
| pub struct RowGroupPredicateBuilder { |
| parquet_schema: Schema, |
| predicate_expr: Arc<dyn PhysicalExpr>, |
| stat_column_req: Vec<(String, StatisticsType, Field)>, |
| } |
| |
| impl RowGroupPredicateBuilder { |
| /// Try to create a new instance of PredicateExpressionBuilder. |
| /// This will translate the filter expression into a statistics predicate expression |
| /// (for example (column / 2) = 4 becomes (column_min / 2) <= 4 && 4 <= (column_max / 2)), |
| /// then convert it to a DataFusion PhysicalExpression and cache it for later use by build_row_group_predicate. |
| pub fn try_new(expr: &Expr, parquet_schema: Schema) -> Result<Self> { |
| // build predicate expression once |
| let mut stat_column_req = Vec::<(String, StatisticsType, Field)>::new(); |
| let logical_predicate_expr = |
| build_predicate_expression(expr, &parquet_schema, &mut stat_column_req)?; |
| // println!( |
| // "RowGroupPredicateBuilder::try_new, logical_predicate_expr: {:?}", |
| // logical_predicate_expr |
| // ); |
| // build physical predicate expression |
| let stat_fields = stat_column_req |
| .iter() |
| .map(|(_, _, f)| f.clone()) |
| .collect::<Vec<_>>(); |
| let stat_schema = Schema::new(stat_fields); |
| let execution_context_state = ExecutionContextState { |
| datasources: HashMap::new(), |
| scalar_functions: HashMap::new(), |
| var_provider: HashMap::new(), |
| aggregate_functions: HashMap::new(), |
| config: ExecutionConfig::new(), |
| }; |
| let predicate_expr = DefaultPhysicalPlanner::default().create_physical_expr( |
| &logical_predicate_expr, |
| &stat_schema, |
| &execution_context_state, |
| )?; |
| // println!( |
| // "RowGroupPredicateBuilder::try_new, predicate_expr: {:?}", |
| // predicate_expr |
| // ); |
| Ok(Self { |
| parquet_schema, |
| predicate_expr, |
| stat_column_req, |
| }) |
| } |
| |
| /// Generate a predicate function used to filter row group metadata. |
| /// This function takes a list of all row groups as parameter, |
| /// so that DataFusion's physical expressions can be re-used by |
| /// generating a RecordBatch, containing statistics arrays, |
| /// on which the physical predicate expression is executed to generate a row group filter array. |
| /// The generated filter array is then used in the returned closure to filter row groups. |
| pub fn build_row_group_predicate( |
| &self, |
| row_group_metadata: &[RowGroupMetaData], |
| ) -> Box<dyn Fn(&RowGroupMetaData, usize) -> bool> { |
| // build statistics record batch |
| let predicate_result = build_statistics_record_batch( |
| row_group_metadata, |
| &self.parquet_schema, |
| &self.stat_column_req, |
| ) |
| .and_then(|statistics_batch| { |
| // execute predicate expression |
| self.predicate_expr.evaluate(&statistics_batch) |
| }) |
| .and_then(|v| match v { |
| ColumnarValue::Array(array) => Ok(array), |
| ColumnarValue::Scalar(_) => Err(DataFusionError::Plan( |
| "predicate expression didn't return an array".to_string(), |
| )), |
| }); |
| |
| let predicate_array = match predicate_result { |
| Ok(array) => array, |
| // row group filter array could not be built |
| // return a closure which will not filter out any row groups |
| _ => return Box::new(|_r, _i| true), |
| }; |
| |
| let predicate_array = predicate_array.as_any().downcast_ref::<BooleanArray>(); |
| match predicate_array { |
| // return row group predicate function |
| Some(array) => { |
| // when the result of the predicate expression for a row group is null / undefined, |
| // e.g. due to missing statistics, this row group can't be filtered out, |
| // so replace with true |
| let predicate_values = |
| array.iter().map(|x| x.unwrap_or(true)).collect::<Vec<_>>(); |
| Box::new(move |_, i| predicate_values[i]) |
| } |
| // predicate result is not a BooleanArray |
| // return a closure which will not filter out any row groups |
| _ => Box::new(|_r, _i| true), |
| } |
| } |
| } |
| |
| /// Build a RecordBatch from a list of RowGroupMetadata structs, |
| /// creating arrays, one for each statistics column, |
| /// as requested in the stat_column_req parameter. |
| fn build_statistics_record_batch( |
| row_groups: &[RowGroupMetaData], |
| parquet_schema: &Schema, |
| stat_column_req: &[(String, StatisticsType, Field)], |
| ) -> Result<RecordBatch> { |
| let mut fields = Vec::<Field>::new(); |
| let mut arrays = Vec::<ArrayRef>::new(); |
| for (column_name, statistics_type, stat_field) in stat_column_req { |
| if let Some((column_index, _)) = parquet_schema.column_with_name(column_name) { |
| let statistics = row_groups |
| .iter() |
| .map(|g| g.column(column_index).statistics()) |
| .collect::<Vec<_>>(); |
| let array = build_statistics_array( |
| &statistics, |
| *statistics_type, |
| stat_field.data_type(), |
| ); |
| fields.push(stat_field.clone()); |
| arrays.push(array); |
| } |
| } |
| let schema = Arc::new(Schema::new(fields)); |
| RecordBatch::try_new(schema, arrays) |
| .map_err(|err| DataFusionError::Plan(err.to_string())) |
| } |
| |
| struct StatisticsExpressionBuilder<'a> { |
| column_name: String, |
| column_expr: &'a Expr, |
| scalar_expr: &'a Expr, |
| parquet_field: &'a Field, |
| stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>, |
| reverse_operator: bool, |
| } |
| |
| impl<'a> StatisticsExpressionBuilder<'a> { |
| fn try_new( |
| left: &'a Expr, |
| right: &'a Expr, |
| parquet_schema: &'a Schema, |
| stat_column_req: &'a mut Vec<(String, StatisticsType, Field)>, |
| ) -> Result<Self> { |
| // find column name; input could be a more complicated expression |
| let mut left_columns = HashSet::<String>::new(); |
| utils::expr_to_column_names(left, &mut left_columns)?; |
| let mut right_columns = HashSet::<String>::new(); |
| utils::expr_to_column_names(right, &mut right_columns)?; |
| let (column_expr, scalar_expr, column_names, reverse_operator) = |
| match (left_columns.len(), right_columns.len()) { |
| (1, 0) => (left, right, left_columns, false), |
| (0, 1) => (right, left, right_columns, true), |
| _ => { |
| // if more than one column used in expression - not supported |
| return Err(DataFusionError::Plan( |
| "Multi-column expressions are not currently supported" |
| .to_string(), |
| )); |
| } |
| }; |
| let column_name = column_names.iter().next().unwrap().clone(); |
| let field = match parquet_schema.column_with_name(&column_name) { |
| Some((_, f)) => f, |
| _ => { |
| // field not found in parquet schema |
| return Err(DataFusionError::Plan( |
| "Field not found in parquet schema".to_string(), |
| )); |
| } |
| }; |
| |
| Ok(Self { |
| column_name, |
| column_expr, |
| scalar_expr, |
| parquet_field: field, |
| stat_column_req, |
| reverse_operator, |
| }) |
| } |
| |
| fn correct_operator(&self, op: Operator) -> Operator { |
| if !self.reverse_operator { |
| return op; |
| } |
| |
| match op { |
| Operator::Lt => Operator::Gt, |
| Operator::Gt => Operator::Lt, |
| Operator::LtEq => Operator::GtEq, |
| Operator::GtEq => Operator::LtEq, |
| _ => op, |
| } |
| } |
| |
| // fn column_expr(&self) -> &Expr { |
| // self.column_expr |
| // } |
| |
| fn scalar_expr(&self) -> &Expr { |
| self.scalar_expr |
| } |
| |
| // fn column_name(&self) -> &String { |
| // &self.column_name |
| // } |
| |
| fn is_stat_column_missing(&self, statistics_type: StatisticsType) -> bool { |
| self.stat_column_req |
| .iter() |
| .filter(|(c, t, _f)| c == &self.column_name && t == &statistics_type) |
| .count() |
| == 0 |
| } |
| |
| fn stat_column_expr( |
| &mut self, |
| stat_type: StatisticsType, |
| suffix: &str, |
| ) -> Result<Expr> { |
| let stat_column_name = format!("{}_{}", self.column_name, suffix); |
| let stat_field = Field::new( |
| stat_column_name.as_str(), |
| self.parquet_field.data_type().clone(), |
| self.parquet_field.is_nullable(), |
| ); |
| if self.is_stat_column_missing(stat_type) { |
| // only add statistics column if not previously added |
| self.stat_column_req |
| .push((self.column_name.clone(), stat_type, stat_field)); |
| } |
| rewrite_column_expr( |
| self.column_expr, |
| self.column_name.as_str(), |
| stat_column_name.as_str(), |
| ) |
| } |
| |
| fn min_column_expr(&mut self) -> Result<Expr> { |
| self.stat_column_expr(StatisticsType::Min, "min") |
| } |
| |
| fn max_column_expr(&mut self) -> Result<Expr> { |
| self.stat_column_expr(StatisticsType::Max, "max") |
| } |
| } |
| |
| /// replaces a column with an old name with a new name in an expression |
| fn rewrite_column_expr( |
| expr: &Expr, |
| column_old_name: &str, |
| column_new_name: &str, |
| ) -> Result<Expr> { |
| let expressions = utils::expr_sub_expressions(&expr)?; |
| let expressions = expressions |
| .iter() |
| .map(|e| rewrite_column_expr(e, column_old_name, column_new_name)) |
| .collect::<Result<Vec<_>>>()?; |
| |
| if let Expr::Column(name) = expr { |
| if name == column_old_name { |
| return Ok(Expr::Column(column_new_name.to_string())); |
| } |
| } |
| utils::rewrite_expression(&expr, &expressions) |
| } |
| |
| /// Translate logical filter expression into parquet statistics predicate expression |
| fn build_predicate_expression( |
| expr: &Expr, |
| parquet_schema: &Schema, |
| stat_column_req: &mut Vec<(String, StatisticsType, Field)>, |
| ) -> Result<Expr> { |
| use crate::logical_plan; |
| // predicate expression can only be a binary expression |
| let (left, op, right) = match expr { |
| Expr::BinaryExpr { left, op, right } => (left, *op, right), |
| _ => { |
| // unsupported expression - replace with TRUE |
| // this can still be useful when multiple conditions are joined using AND |
| // such as: column > 10 AND TRUE |
| return Ok(logical_plan::lit(true)); |
| } |
| }; |
| |
| if op == Operator::And || op == Operator::Or { |
| let left_expr = |
| build_predicate_expression(left, parquet_schema, stat_column_req)?; |
| let right_expr = |
| build_predicate_expression(right, parquet_schema, stat_column_req)?; |
| return Ok(logical_plan::binary_expr(left_expr, op, right_expr)); |
| } |
| |
| let expr_builder = StatisticsExpressionBuilder::try_new( |
| left, |
| right, |
| parquet_schema, |
| stat_column_req, |
| ); |
| let mut expr_builder = match expr_builder { |
| Ok(builder) => builder, |
| // allow partial failure in predicate expression generation |
| // this can still produce a useful predicate when multiple conditions are joined using AND |
| Err(_) => { |
| return Ok(logical_plan::lit(true)); |
| } |
| }; |
| let corrected_op = expr_builder.correct_operator(op); |
| let statistics_expr = match corrected_op { |
| Operator::Eq => { |
| // column = literal => (min, max) = literal => min <= literal && literal <= max |
| // (column / 2) = 4 => (column_min / 2) <= 4 && 4 <= (column_max / 2) |
| let min_column_expr = expr_builder.min_column_expr()?; |
| let max_column_expr = expr_builder.max_column_expr()?; |
| min_column_expr |
| .lt_eq(expr_builder.scalar_expr().clone()) |
| .and(expr_builder.scalar_expr().clone().lt_eq(max_column_expr)) |
| } |
| Operator::Gt => { |
| // column > literal => (min, max) > literal => max > literal |
| expr_builder |
| .max_column_expr()? |
| .gt(expr_builder.scalar_expr().clone()) |
| } |
| Operator::GtEq => { |
| // column >= literal => (min, max) >= literal => max >= literal |
| expr_builder |
| .max_column_expr()? |
| .gt_eq(expr_builder.scalar_expr().clone()) |
| } |
| Operator::Lt => { |
| // column < literal => (min, max) < literal => min < literal |
| expr_builder |
| .min_column_expr()? |
| .lt(expr_builder.scalar_expr().clone()) |
| } |
| Operator::LtEq => { |
| // column <= literal => (min, max) <= literal => min <= literal |
| expr_builder |
| .min_column_expr()? |
| .lt_eq(expr_builder.scalar_expr().clone()) |
| } |
| // other expressions are not supported |
| _ => logical_plan::lit(true), |
| }; |
| Ok(statistics_expr) |
| } |
| |
| #[derive(Debug, Copy, Clone, PartialEq)] |
| enum StatisticsType { |
| Min, |
| Max, |
| } |
| |
| fn build_statistics_array( |
| statistics: &[Option<&ParquetStatistics>], |
| statistics_type: StatisticsType, |
| data_type: &DataType, |
| ) -> ArrayRef { |
| let statistics_count = statistics.len(); |
| let first_group_stats = statistics.iter().find(|s| s.is_some()); |
| let first_group_stats = if let Some(Some(statistics)) = first_group_stats { |
| // found first row group with statistics defined |
| statistics |
| } else { |
| // no row group has statistics defined |
| return new_null_array(data_type, statistics_count); |
| }; |
| |
| let (data_size, arrow_type) = match first_group_stats { |
| ParquetStatistics::Int32(_) => (std::mem::size_of::<i32>(), DataType::Int32), |
| ParquetStatistics::Int64(_) => (std::mem::size_of::<i64>(), DataType::Int64), |
| ParquetStatistics::Float(_) => (std::mem::size_of::<f32>(), DataType::Float32), |
| ParquetStatistics::Double(_) => (std::mem::size_of::<f64>(), DataType::Float64), |
| ParquetStatistics::ByteArray(_) if data_type == &DataType::Utf8 => { |
| (0, DataType::Utf8) |
| } |
| _ => { |
| // type of statistics not supported |
| return new_null_array(data_type, statistics_count); |
| } |
| }; |
| |
| let statistics = statistics.iter().map(|s| { |
| s.filter(|s| s.has_min_max_set()) |
| .map(|s| match statistics_type { |
| StatisticsType::Min => s.min_bytes(), |
| StatisticsType::Max => s.max_bytes(), |
| }) |
| }); |
| |
| if arrow_type == DataType::Utf8 { |
| let data_size = statistics |
| .clone() |
| .map(|x| x.map(|b| b.len()).unwrap_or(0)) |
| .sum(); |
| let mut builder = |
| arrow::array::StringBuilder::with_capacity(statistics_count, data_size); |
| let string_statistics = |
| statistics.map(|x| x.and_then(|bytes| std::str::from_utf8(bytes).ok())); |
| for maybe_string in string_statistics { |
| match maybe_string { |
| Some(string_value) => builder.append_value(string_value).unwrap(), |
| None => builder.append_null().unwrap(), |
| }; |
| } |
| return Arc::new(builder.finish()); |
| } |
| |
| let mut data_buffer = MutableBuffer::new(statistics_count * data_size); |
| let mut bitmap_builder = BooleanBufferBuilder::new(statistics_count); |
| let mut null_count = 0; |
| for s in statistics { |
| if let Some(stat_data) = s { |
| bitmap_builder.append(true); |
| data_buffer.extend_from_slice(stat_data); |
| } else { |
| bitmap_builder.append(false); |
| data_buffer.resize(data_buffer.len() + data_size, 0); |
| null_count += 1; |
| } |
| } |
| |
| let mut builder = ArrayData::builder(arrow_type) |
| .len(statistics_count) |
| .add_buffer(data_buffer.into()); |
| if null_count > 0 { |
| builder = builder.null_bit_buffer(bitmap_builder.finish()); |
| } |
| let array_data = builder.build(); |
| let statistics_array = make_array(array_data); |
| if statistics_array.data_type() == data_type { |
| return statistics_array; |
| } |
| // cast statistics array to required data type |
| arrow::compute::cast(&statistics_array, data_type) |
| .unwrap_or_else(|_| new_null_array(data_type, statistics_count)) |
| } |
| |
| #[async_trait] |
| impl ExecutionPlan for ParquetExec { |
| /// Return a reference to Any that can be used for downcasting |
| fn as_any(&self) -> &dyn Any { |
| self |
| } |
| |
| fn schema(&self) -> SchemaRef { |
| self.schema.clone() |
| } |
| |
| fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { |
| // this is a leaf node and has no children |
| vec![] |
| } |
| |
| /// Get the output partitioning of this plan |
| fn output_partitioning(&self) -> Partitioning { |
| Partitioning::UnknownPartitioning(self.partitions.len()) |
| } |
| |
| fn with_new_children( |
| &self, |
| children: Vec<Arc<dyn ExecutionPlan>>, |
| ) -> Result<Arc<dyn ExecutionPlan>> { |
| if children.is_empty() { |
| Ok(Arc::new(self.clone())) |
| } else { |
| Err(DataFusionError::Internal(format!( |
| "Children cannot be replaced in {:?}", |
| self |
| ))) |
| } |
| } |
| |
| async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> { |
| // because the parquet implementation is not thread-safe, it is necessary to execute |
| // on a thread and communicate with channels |
| let (response_tx, response_rx): ( |
| Sender<ArrowResult<RecordBatch>>, |
| Receiver<ArrowResult<RecordBatch>>, |
| ) = channel(2); |
| |
| let filenames = self.partitions[partition].filenames.clone(); |
| let projection = self.projection.clone(); |
| let predicate_builder = self.predicate_builder.clone(); |
| let batch_size = self.batch_size; |
| |
| task::spawn_blocking(move || { |
| if let Err(e) = read_files( |
| &filenames, |
| &projection, |
| &predicate_builder, |
| batch_size, |
| response_tx, |
| ) { |
| println!("Parquet reader thread terminated due to error: {:?}", e); |
| } |
| }); |
| |
| Ok(Box::pin(ParquetStream { |
| schema: self.schema.clone(), |
| inner: ReceiverStream::new(response_rx), |
| })) |
| } |
| } |
| |
| fn send_result( |
| response_tx: &Sender<ArrowResult<RecordBatch>>, |
| result: ArrowResult<RecordBatch>, |
| ) -> Result<()> { |
| // Note this function is running on its own blockng tokio thread so blocking here is ok. |
| response_tx |
| .blocking_send(result) |
| .map_err(|e| DataFusionError::Execution(e.to_string()))?; |
| Ok(()) |
| } |
| |
| fn read_files( |
| filenames: &[String], |
| projection: &[usize], |
| predicate_builder: &Option<RowGroupPredicateBuilder>, |
| batch_size: usize, |
| response_tx: Sender<ArrowResult<RecordBatch>>, |
| ) -> Result<()> { |
| for filename in filenames { |
| let file = File::open(&filename)?; |
| let mut file_reader = SerializedFileReader::new(file)?; |
| if let Some(predicate_builder) = predicate_builder { |
| let row_group_predicate = predicate_builder |
| .build_row_group_predicate(file_reader.metadata().row_groups()); |
| file_reader.filter_row_groups(&row_group_predicate); |
| } |
| let mut arrow_reader = ParquetFileArrowReader::new(Arc::new(file_reader)); |
| let mut batch_reader = arrow_reader |
| .get_record_reader_by_columns(projection.to_owned(), batch_size)?; |
| loop { |
| match batch_reader.next() { |
| Some(Ok(batch)) => { |
| //println!("ParquetExec got new batch from {}", filename); |
| send_result(&response_tx, Ok(batch))? |
| } |
| None => { |
| break; |
| } |
| Some(Err(e)) => { |
| let err_msg = format!( |
| "Error reading batch from {}: {}", |
| filename, |
| e.to_string() |
| ); |
| // send error to operator |
| send_result( |
| &response_tx, |
| Err(ArrowError::ParquetError(err_msg.clone())), |
| )?; |
| // terminate thread with error |
| return Err(DataFusionError::Execution(err_msg)); |
| } |
| } |
| } |
| } |
| |
| // finished reading files (dropping response_tx will close |
| // channel) |
| Ok(()) |
| } |
| |
| fn split_files(filenames: &[String], n: usize) -> Vec<&[String]> { |
| let mut chunk_size = filenames.len() / n; |
| if filenames.len() % n > 0 { |
| chunk_size += 1; |
| } |
| filenames.chunks(chunk_size).collect() |
| } |
| |
| struct ParquetStream { |
| schema: SchemaRef, |
| inner: ReceiverStream<ArrowResult<RecordBatch>>, |
| } |
| |
| impl Stream for ParquetStream { |
| type Item = ArrowResult<RecordBatch>; |
| |
| fn poll_next( |
| mut self: std::pin::Pin<&mut Self>, |
| cx: &mut Context<'_>, |
| ) -> Poll<Option<Self::Item>> { |
| self.inner.poll_next_unpin(cx) |
| } |
| } |
| |
| impl RecordBatchStream for ParquetStream { |
| fn schema(&self) -> SchemaRef { |
| self.schema.clone() |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| use arrow::array::{Int32Array, StringArray}; |
| use futures::StreamExt; |
| use parquet::basic::Type as PhysicalType; |
| use parquet::schema::types::SchemaDescPtr; |
| |
| #[test] |
| fn test_split_files() { |
| let filenames = vec![ |
| "a".to_string(), |
| "b".to_string(), |
| "c".to_string(), |
| "d".to_string(), |
| "e".to_string(), |
| ]; |
| |
| let chunks = split_files(&filenames, 1); |
| assert_eq!(1, chunks.len()); |
| assert_eq!(5, chunks[0].len()); |
| |
| let chunks = split_files(&filenames, 2); |
| assert_eq!(2, chunks.len()); |
| assert_eq!(3, chunks[0].len()); |
| assert_eq!(2, chunks[1].len()); |
| |
| let chunks = split_files(&filenames, 5); |
| assert_eq!(5, chunks.len()); |
| assert_eq!(1, chunks[0].len()); |
| assert_eq!(1, chunks[1].len()); |
| assert_eq!(1, chunks[2].len()); |
| assert_eq!(1, chunks[3].len()); |
| assert_eq!(1, chunks[4].len()); |
| |
| let chunks = split_files(&filenames, 123); |
| assert_eq!(5, chunks.len()); |
| assert_eq!(1, chunks[0].len()); |
| assert_eq!(1, chunks[1].len()); |
| assert_eq!(1, chunks[2].len()); |
| assert_eq!(1, chunks[3].len()); |
| assert_eq!(1, chunks[4].len()); |
| } |
| |
| #[tokio::test] |
| async fn test() -> Result<()> { |
| let testdata = arrow::util::test_util::parquet_test_data(); |
| let filename = format!("{}/alltypes_plain.parquet", testdata); |
| let parquet_exec = |
| ParquetExec::try_from_path(&filename, Some(vec![0, 1, 2]), None, 1024, 4)?; |
| assert_eq!(parquet_exec.output_partitioning().partition_count(), 1); |
| |
| let mut results = parquet_exec.execute(0).await?; |
| let batch = results.next().await.unwrap()?; |
| |
| assert_eq!(8, batch.num_rows()); |
| assert_eq!(3, batch.num_columns()); |
| |
| let schema = batch.schema(); |
| let field_names: Vec<&str> = |
| schema.fields().iter().map(|f| f.name().as_str()).collect(); |
| assert_eq!(vec!["id", "bool_col", "tinyint_col"], field_names); |
| |
| let batch = results.next().await; |
| assert!(batch.is_none()); |
| |
| let batch = results.next().await; |
| assert!(batch.is_none()); |
| |
| let batch = results.next().await; |
| assert!(batch.is_none()); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn build_statistics_array_int32() { |
| // build row group metadata array |
| let s1 = ParquetStatistics::int32(None, Some(10), None, 0, false); |
| let s2 = ParquetStatistics::int32(Some(2), Some(20), None, 0, false); |
| let s3 = ParquetStatistics::int32(Some(3), Some(30), None, 0, false); |
| let statistics = vec![Some(&s1), Some(&s2), Some(&s3)]; |
| |
| let statistics_array = |
| build_statistics_array(&statistics, StatisticsType::Min, &DataType::Int32); |
| let int32_array = statistics_array |
| .as_any() |
| .downcast_ref::<Int32Array>() |
| .unwrap(); |
| let int32_vec = int32_array.into_iter().collect::<Vec<_>>(); |
| assert_eq!(int32_vec, vec![None, Some(2), Some(3)]); |
| |
| let statistics_array = |
| build_statistics_array(&statistics, StatisticsType::Max, &DataType::Int32); |
| let int32_array = statistics_array |
| .as_any() |
| .downcast_ref::<Int32Array>() |
| .unwrap(); |
| let int32_vec = int32_array.into_iter().collect::<Vec<_>>(); |
| // here the first max value is None and not the Some(10) value which was actually set |
| // because the min value is None |
| assert_eq!(int32_vec, vec![None, Some(20), Some(30)]); |
| } |
| |
| #[test] |
| fn build_statistics_array_utf8() { |
| // build row group metadata array |
| let s1 = ParquetStatistics::byte_array(None, Some("10".into()), None, 0, false); |
| let s2 = ParquetStatistics::byte_array( |
| Some("2".into()), |
| Some("20".into()), |
| None, |
| 0, |
| false, |
| ); |
| let s3 = ParquetStatistics::byte_array( |
| Some("3".into()), |
| Some("30".into()), |
| None, |
| 0, |
| false, |
| ); |
| let statistics = vec![Some(&s1), Some(&s2), Some(&s3)]; |
| |
| let statistics_array = |
| build_statistics_array(&statistics, StatisticsType::Min, &DataType::Utf8); |
| let string_array = statistics_array |
| .as_any() |
| .downcast_ref::<StringArray>() |
| .unwrap(); |
| let string_vec = string_array.into_iter().collect::<Vec<_>>(); |
| assert_eq!(string_vec, vec![None, Some("2"), Some("3")]); |
| |
| let statistics_array = |
| build_statistics_array(&statistics, StatisticsType::Max, &DataType::Utf8); |
| let string_array = statistics_array |
| .as_any() |
| .downcast_ref::<StringArray>() |
| .unwrap(); |
| let string_vec = string_array.into_iter().collect::<Vec<_>>(); |
| // here the first max value is None and not the Some("10") value which was actually set |
| // because the min value is None |
| assert_eq!(string_vec, vec![None, Some("20"), Some("30")]); |
| } |
| |
| #[test] |
| fn build_statistics_array_empty_stats() { |
| let data_type = DataType::Int32; |
| let statistics = vec![]; |
| let statistics_array = |
| build_statistics_array(&statistics, StatisticsType::Min, &data_type); |
| assert_eq!(statistics_array.len(), 0); |
| |
| let statistics = vec![None, None]; |
| let statistics_array = |
| build_statistics_array(&statistics, StatisticsType::Min, &data_type); |
| assert_eq!(statistics_array.len(), statistics.len()); |
| assert_eq!(statistics_array.data_type(), &data_type); |
| for i in 0..statistics_array.len() { |
| assert_eq!(statistics_array.is_null(i), true); |
| assert_eq!(statistics_array.is_valid(i), false); |
| } |
| } |
| |
| #[test] |
| fn build_statistics_array_unsupported_type() { |
| // boolean is not currently a supported type for statistics |
| let s1 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false); |
| let s2 = ParquetStatistics::boolean(Some(false), Some(true), None, 0, false); |
| let statistics = vec![Some(&s1), Some(&s2)]; |
| let data_type = DataType::Boolean; |
| let statistics_array = |
| build_statistics_array(&statistics, StatisticsType::Min, &data_type); |
| assert_eq!(statistics_array.len(), statistics.len()); |
| assert_eq!(statistics_array.data_type(), &data_type); |
| for i in 0..statistics_array.len() { |
| assert_eq!(statistics_array.is_null(i), true); |
| assert_eq!(statistics_array.is_valid(i), false); |
| } |
| } |
| |
| #[test] |
| fn row_group_predicate_eq() -> Result<()> { |
| use crate::logical_plan::{col, lit}; |
| let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); |
| let expected_expr = "#c1_min LtEq Int32(1) And Int32(1) LtEq #c1_max"; |
| |
| // test column on the left |
| let expr = col("c1").eq(lit(1)); |
| let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| |
| // test column on the right |
| let expr = lit(1).eq(col("c1")); |
| let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn row_group_predicate_gt() -> Result<()> { |
| use crate::logical_plan::{col, lit}; |
| let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); |
| let expected_expr = "#c1_max Gt Int32(1)"; |
| |
| // test column on the left |
| let expr = col("c1").gt(lit(1)); |
| let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| |
| // test column on the right |
| let expr = lit(1).lt(col("c1")); |
| let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn row_group_predicate_gt_eq() -> Result<()> { |
| use crate::logical_plan::{col, lit}; |
| let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); |
| let expected_expr = "#c1_max GtEq Int32(1)"; |
| |
| // test column on the left |
| let expr = col("c1").gt_eq(lit(1)); |
| let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| // test column on the right |
| let expr = lit(1).lt_eq(col("c1")); |
| let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn row_group_predicate_lt() -> Result<()> { |
| use crate::logical_plan::{col, lit}; |
| let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); |
| let expected_expr = "#c1_min Lt Int32(1)"; |
| |
| // test column on the left |
| let expr = col("c1").lt(lit(1)); |
| let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| |
| // test column on the right |
| let expr = lit(1).gt(col("c1")); |
| let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn row_group_predicate_lt_eq() -> Result<()> { |
| use crate::logical_plan::{col, lit}; |
| let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); |
| let expected_expr = "#c1_min LtEq Int32(1)"; |
| |
| // test column on the left |
| let expr = col("c1").lt_eq(lit(1)); |
| let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| // test column on the right |
| let expr = lit(1).gt_eq(col("c1")); |
| let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn row_group_predicate_and() -> Result<()> { |
| use crate::logical_plan::{col, lit}; |
| let schema = Schema::new(vec![ |
| Field::new("c1", DataType::Int32, false), |
| Field::new("c2", DataType::Int32, false), |
| Field::new("c3", DataType::Int32, false), |
| ]); |
| // test AND operator joining supported c1 < 1 expression and unsupported c2 > c3 expression |
| let expr = col("c1").lt(lit(1)).and(col("c2").lt(col("c3"))); |
| let expected_expr = "#c1_min Lt Int32(1) And Boolean(true)"; |
| let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn row_group_predicate_or() -> Result<()> { |
| use crate::logical_plan::{col, lit}; |
| let schema = Schema::new(vec![ |
| Field::new("c1", DataType::Int32, false), |
| Field::new("c2", DataType::Int32, false), |
| ]); |
| // test OR operator joining supported c1 < 1 expression and unsupported c2 % 2 expression |
| let expr = col("c1").lt(lit(1)).or(col("c2").modulus(lit(2))); |
| let expected_expr = "#c1_min Lt Int32(1) Or Boolean(true)"; |
| let predicate_expr = build_predicate_expression(&expr, &schema, &mut vec![])?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn row_group_predicate_stat_column_req() -> Result<()> { |
| use crate::logical_plan::{col, lit}; |
| let schema = Schema::new(vec![ |
| Field::new("c1", DataType::Int32, false), |
| Field::new("c2", DataType::Int32, false), |
| ]); |
| let mut stat_column_req = vec![]; |
| // c1 < 1 and (c2 = 2 or c2 = 3) |
| let expr = col("c1") |
| .lt(lit(1)) |
| .and(col("c2").eq(lit(2)).or(col("c2").eq(lit(3)))); |
| let expected_expr = "#c1_min Lt Int32(1) And #c2_min LtEq Int32(2) And Int32(2) LtEq #c2_max Or #c2_min LtEq Int32(3) And Int32(3) LtEq #c2_max"; |
| let predicate_expr = |
| build_predicate_expression(&expr, &schema, &mut stat_column_req)?; |
| assert_eq!(format!("{:?}", predicate_expr), expected_expr); |
| // c1 < 1 should add c1_min |
| let c1_min_field = Field::new("c1_min", DataType::Int32, false); |
| assert_eq!( |
| stat_column_req[0], |
| ("c1".to_owned(), StatisticsType::Min, c1_min_field) |
| ); |
| // c2 = 2 should add c2_min and c2_max |
| let c2_min_field = Field::new("c2_min", DataType::Int32, false); |
| assert_eq!( |
| stat_column_req[1], |
| ("c2".to_owned(), StatisticsType::Min, c2_min_field) |
| ); |
| let c2_max_field = Field::new("c2_max", DataType::Int32, false); |
| assert_eq!( |
| stat_column_req[2], |
| ("c2".to_owned(), StatisticsType::Max, c2_max_field) |
| ); |
| // c2 = 3 shouldn't add any new statistics fields |
| assert_eq!(stat_column_req.len(), 3); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn row_group_predicate_builder_simple_expr() -> Result<()> { |
| use crate::logical_plan::{col, lit}; |
| // int > 1 => c1_max > 1 |
| let expr = col("c1").gt(lit(15)); |
| let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); |
| let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; |
| |
| let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); |
| let rgm1 = get_row_group_meta_data( |
| &schema_descr, |
| vec![ParquetStatistics::int32(Some(1), Some(10), None, 0, false)], |
| ); |
| let rgm2 = get_row_group_meta_data( |
| &schema_descr, |
| vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], |
| ); |
| let row_group_metadata = vec![rgm1, rgm2]; |
| let row_group_predicate = |
| predicate_builder.build_row_group_predicate(&row_group_metadata); |
| let row_group_filter = row_group_metadata |
| .iter() |
| .enumerate() |
| .map(|(i, g)| row_group_predicate(g, i)) |
| .collect::<Vec<_>>(); |
| assert_eq!(row_group_filter, vec![false, true]); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn row_group_predicate_builder_missing_stats() -> Result<()> { |
| use crate::logical_plan::{col, lit}; |
| // int > 1 => c1_max > 1 |
| let expr = col("c1").gt(lit(15)); |
| let schema = Schema::new(vec![Field::new("c1", DataType::Int32, false)]); |
| let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; |
| |
| let schema_descr = get_test_schema_descr(vec![("c1", PhysicalType::INT32)]); |
| let rgm1 = get_row_group_meta_data( |
| &schema_descr, |
| vec![ParquetStatistics::int32(None, None, None, 0, false)], |
| ); |
| let rgm2 = get_row_group_meta_data( |
| &schema_descr, |
| vec![ParquetStatistics::int32(Some(11), Some(20), None, 0, false)], |
| ); |
| let row_group_metadata = vec![rgm1, rgm2]; |
| let row_group_predicate = |
| predicate_builder.build_row_group_predicate(&row_group_metadata); |
| let row_group_filter = row_group_metadata |
| .iter() |
| .enumerate() |
| .map(|(i, g)| row_group_predicate(g, i)) |
| .collect::<Vec<_>>(); |
| // missing statistics for first row group mean that the result from the predicate expression |
| // is null / undefined so the first row group can't be filtered out |
| assert_eq!(row_group_filter, vec![true, true]); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn row_group_predicate_builder_partial_expr() -> Result<()> { |
| use crate::logical_plan::{col, lit}; |
| // test row group predicate with partially supported expression |
| // int > 1 and int % 2 => c1_max > 1 and true |
| let expr = col("c1").gt(lit(15)).and(col("c2").modulus(lit(2))); |
| let schema = Schema::new(vec![ |
| Field::new("c1", DataType::Int32, false), |
| Field::new("c2", DataType::Int32, false), |
| ]); |
| let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema.clone())?; |
| |
| let schema_descr = get_test_schema_descr(vec![ |
| ("c1", PhysicalType::INT32), |
| ("c2", PhysicalType::INT32), |
| ]); |
| let rgm1 = get_row_group_meta_data( |
| &schema_descr, |
| vec![ |
| ParquetStatistics::int32(Some(1), Some(10), None, 0, false), |
| ParquetStatistics::int32(Some(1), Some(10), None, 0, false), |
| ], |
| ); |
| let rgm2 = get_row_group_meta_data( |
| &schema_descr, |
| vec![ |
| ParquetStatistics::int32(Some(11), Some(20), None, 0, false), |
| ParquetStatistics::int32(Some(11), Some(20), None, 0, false), |
| ], |
| ); |
| let row_group_metadata = vec![rgm1, rgm2]; |
| let row_group_predicate = |
| predicate_builder.build_row_group_predicate(&row_group_metadata); |
| let row_group_filter = row_group_metadata |
| .iter() |
| .enumerate() |
| .map(|(i, g)| row_group_predicate(g, i)) |
| .collect::<Vec<_>>(); |
| // the first row group is still filtered out because the predicate expression can be partially evaluated |
| // when conditions are joined using AND |
| assert_eq!(row_group_filter, vec![false, true]); |
| |
| // if conditions in predicate are joined with OR and an unsupported expression is used |
| // this bypasses the entire predicate expression and no row groups are filtered out |
| let expr = col("c1").gt(lit(15)).or(col("c2").modulus(lit(2))); |
| let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; |
| let row_group_predicate = |
| predicate_builder.build_row_group_predicate(&row_group_metadata); |
| let row_group_filter = row_group_metadata |
| .iter() |
| .enumerate() |
| .map(|(i, g)| row_group_predicate(g, i)) |
| .collect::<Vec<_>>(); |
| assert_eq!(row_group_filter, vec![true, true]); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn row_group_predicate_builder_unsupported_type() -> Result<()> { |
| use crate::logical_plan::{col, lit}; |
| // test row group predicate with unsupported statistics type (boolean) |
| // where a null array is generated for some statistics columns |
| // int > 1 and bool = true => c1_max > 1 and null |
| let expr = col("c1").gt(lit(15)).and(col("c2").eq(lit(true))); |
| let schema = Schema::new(vec![ |
| Field::new("c1", DataType::Int32, false), |
| Field::new("c2", DataType::Boolean, false), |
| ]); |
| let predicate_builder = RowGroupPredicateBuilder::try_new(&expr, schema)?; |
| |
| let schema_descr = get_test_schema_descr(vec![ |
| ("c1", PhysicalType::INT32), |
| ("c2", PhysicalType::BOOLEAN), |
| ]); |
| let rgm1 = get_row_group_meta_data( |
| &schema_descr, |
| vec![ |
| ParquetStatistics::int32(Some(1), Some(10), None, 0, false), |
| ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), |
| ], |
| ); |
| let rgm2 = get_row_group_meta_data( |
| &schema_descr, |
| vec![ |
| ParquetStatistics::int32(Some(11), Some(20), None, 0, false), |
| ParquetStatistics::boolean(Some(false), Some(true), None, 0, false), |
| ], |
| ); |
| let row_group_metadata = vec![rgm1, rgm2]; |
| let row_group_predicate = |
| predicate_builder.build_row_group_predicate(&row_group_metadata); |
| let row_group_filter = row_group_metadata |
| .iter() |
| .enumerate() |
| .map(|(i, g)| row_group_predicate(g, i)) |
| .collect::<Vec<_>>(); |
| // no row group is filtered out because the predicate expression can't be evaluated |
| // when a null array is generated for a statistics column, |
| // because the null values propagate to the end result, making the predicate result undefined |
| assert_eq!(row_group_filter, vec![true, true]); |
| |
| Ok(()) |
| } |
| |
| fn get_row_group_meta_data( |
| schema_descr: &SchemaDescPtr, |
| column_statistics: Vec<ParquetStatistics>, |
| ) -> RowGroupMetaData { |
| use parquet::file::metadata::ColumnChunkMetaData; |
| let mut columns = vec![]; |
| for (i, s) in column_statistics.iter().enumerate() { |
| let column = ColumnChunkMetaData::builder(schema_descr.column(i)) |
| .set_statistics(s.clone()) |
| .build() |
| .unwrap(); |
| columns.push(column); |
| } |
| RowGroupMetaData::builder(schema_descr.clone()) |
| .set_num_rows(1000) |
| .set_total_byte_size(2000) |
| .set_column_metadata(columns) |
| .build() |
| .unwrap() |
| } |
| |
| fn get_test_schema_descr(fields: Vec<(&str, PhysicalType)>) -> SchemaDescPtr { |
| use parquet::schema::types::{SchemaDescriptor, Type as SchemaType}; |
| let mut schema_fields = fields |
| .iter() |
| .map(|(n, t)| { |
| Arc::new(SchemaType::primitive_type_builder(n, *t).build().unwrap()) |
| }) |
| .collect::<Vec<_>>(); |
| let schema = SchemaType::group_type_builder("schema") |
| .with_fields(&mut schema_fields) |
| .build() |
| .unwrap(); |
| |
| Arc::new(SchemaDescriptor::new(Arc::new(schema))) |
| } |
| } |