| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Contains implementation of record assembly and converting Parquet types into |
| //! [`Row`](crate::record::Row)s. |
| |
| use std::{collections::HashMap, fmt, sync::Arc}; |
| |
| use crate::basic::{ConvertedType, Repetition}; |
| use crate::errors::{ParquetError, Result}; |
| use crate::file::reader::{FileReader, RowGroupReader}; |
| use crate::record::{ |
| api::{make_list, make_map, make_row, Field, Row}, |
| triplet::TripletIter, |
| }; |
| use crate::schema::types::{ColumnPath, SchemaDescPtr, SchemaDescriptor, Type, TypePtr}; |
| |
| /// Default batch size for a reader |
| const DEFAULT_BATCH_SIZE: usize = 1024; |
| |
| /// Tree builder for `Reader` enum. |
| /// Serves as a container of options for building a reader tree and a builder, and |
| /// accessing a records iterator [`RowIter`]. |
| pub struct TreeBuilder { |
| // Batch size (>= 1) for triplet iterators |
| batch_size: usize, |
| } |
| |
| impl TreeBuilder { |
| /// Creates new tree builder with default parameters. |
| pub fn new() -> Self { |
| Self { |
| batch_size: DEFAULT_BATCH_SIZE, |
| } |
| } |
| |
| /// Sets batch size for this tree builder. |
| pub fn with_batch_size(mut self, batch_size: usize) -> Self { |
| self.batch_size = batch_size; |
| self |
| } |
| |
| /// Creates new root reader for provided schema and row group. |
| pub fn build( |
| &self, |
| descr: SchemaDescPtr, |
| row_group_reader: &dyn RowGroupReader, |
| ) -> Reader { |
| // Prepare lookup table of column path -> original column index |
| // This allows to prune columns and map schema leaf nodes to the column readers |
| let mut paths: HashMap<ColumnPath, usize> = HashMap::new(); |
| let row_group_metadata = row_group_reader.metadata(); |
| |
| for col_index in 0..row_group_reader.num_columns() { |
| let col_meta = row_group_metadata.column(col_index); |
| let col_path = col_meta.column_path().clone(); |
| paths.insert(col_path, col_index); |
| } |
| |
| // Build child readers for the message type |
| let mut readers = Vec::new(); |
| let mut path = Vec::new(); |
| |
| for field in descr.root_schema().get_fields() { |
| let reader = self.reader_tree( |
| field.clone(), |
| &mut path, |
| 0, |
| 0, |
| &paths, |
| row_group_reader, |
| ); |
| readers.push(reader); |
| } |
| |
| // Return group reader for message type, |
| // it is always required with definition level 0 |
| Reader::GroupReader(None, 0, readers) |
| } |
| |
| /// Creates iterator of `Row`s directly from schema descriptor and row group. |
| pub fn as_iter( |
| &self, |
| descr: SchemaDescPtr, |
| row_group_reader: &dyn RowGroupReader, |
| ) -> ReaderIter { |
| let num_records = row_group_reader.metadata().num_rows() as usize; |
| ReaderIter::new(self.build(descr, row_group_reader), num_records) |
| } |
| |
| /// Builds tree of readers for the current schema recursively. |
| fn reader_tree( |
| &self, |
| field: TypePtr, |
| mut path: &mut Vec<String>, |
| mut curr_def_level: i16, |
| mut curr_rep_level: i16, |
| paths: &HashMap<ColumnPath, usize>, |
| row_group_reader: &dyn RowGroupReader, |
| ) -> Reader { |
| assert!(field.get_basic_info().has_repetition()); |
| // Update current definition and repetition levels for this type |
| let repetition = field.get_basic_info().repetition(); |
| match repetition { |
| Repetition::OPTIONAL => { |
| curr_def_level += 1; |
| } |
| Repetition::REPEATED => { |
| curr_def_level += 1; |
| curr_rep_level += 1; |
| } |
| _ => {} |
| } |
| |
| path.push(String::from(field.name())); |
| let reader = if field.is_primitive() { |
| let col_path = ColumnPath::new(path.to_vec()); |
| let orig_index = *paths.get(&col_path).unwrap(); |
| let col_descr = row_group_reader |
| .metadata() |
| .column(orig_index) |
| .column_descr_ptr(); |
| let col_reader = row_group_reader.get_column_reader(orig_index).unwrap(); |
| let column = TripletIter::new(col_descr, col_reader, self.batch_size); |
| Reader::PrimitiveReader(field, column) |
| } else { |
| match field.get_basic_info().converted_type() { |
| // List types |
| ConvertedType::LIST => { |
| assert_eq!( |
| field.get_fields().len(), |
| 1, |
| "Invalid list type {:?}", |
| field |
| ); |
| |
| let repeated_field = field.get_fields()[0].clone(); |
| assert_eq!( |
| repeated_field.get_basic_info().repetition(), |
| Repetition::REPEATED, |
| "Invalid list type {:?}", |
| field |
| ); |
| |
| if Reader::is_element_type(&repeated_field) { |
| // Support for backward compatible lists |
| let reader = self.reader_tree( |
| repeated_field, |
| &mut path, |
| curr_def_level, |
| curr_rep_level, |
| paths, |
| row_group_reader, |
| ); |
| |
| Reader::RepeatedReader( |
| field, |
| curr_def_level, |
| curr_rep_level, |
| Box::new(reader), |
| ) |
| } else { |
| let child_field = repeated_field.get_fields()[0].clone(); |
| |
| path.push(String::from(repeated_field.name())); |
| |
| let reader = self.reader_tree( |
| child_field, |
| &mut path, |
| curr_def_level + 1, |
| curr_rep_level + 1, |
| paths, |
| row_group_reader, |
| ); |
| |
| path.pop(); |
| |
| Reader::RepeatedReader( |
| field, |
| curr_def_level, |
| curr_rep_level, |
| Box::new(reader), |
| ) |
| } |
| } |
| // Map types (key-value pairs) |
| ConvertedType::MAP | ConvertedType::MAP_KEY_VALUE => { |
| assert_eq!( |
| field.get_fields().len(), |
| 1, |
| "Invalid map type: {:?}", |
| field |
| ); |
| assert!( |
| !field.get_fields()[0].is_primitive(), |
| "Invalid map type: {:?}", |
| field |
| ); |
| |
| let key_value_type = field.get_fields()[0].clone(); |
| assert_eq!( |
| key_value_type.get_basic_info().repetition(), |
| Repetition::REPEATED, |
| "Invalid map type: {:?}", |
| field |
| ); |
| assert_eq!( |
| key_value_type.get_fields().len(), |
| 2, |
| "Invalid map type: {:?}", |
| field |
| ); |
| |
| path.push(String::from(key_value_type.name())); |
| |
| let key_type = &key_value_type.get_fields()[0]; |
| assert!( |
| key_type.is_primitive(), |
| "Map key type is expected to be a primitive type, but found {:?}", |
| key_type |
| ); |
| let key_reader = self.reader_tree( |
| key_type.clone(), |
| &mut path, |
| curr_def_level + 1, |
| curr_rep_level + 1, |
| paths, |
| row_group_reader, |
| ); |
| |
| let value_type = &key_value_type.get_fields()[1]; |
| let value_reader = self.reader_tree( |
| value_type.clone(), |
| &mut path, |
| curr_def_level + 1, |
| curr_rep_level + 1, |
| paths, |
| row_group_reader, |
| ); |
| |
| path.pop(); |
| |
| Reader::KeyValueReader( |
| field, |
| curr_def_level, |
| curr_rep_level, |
| Box::new(key_reader), |
| Box::new(value_reader), |
| ) |
| } |
| // A repeated field that is neither contained by a `LIST`- or |
| // `MAP`-annotated group nor annotated by `LIST` or `MAP` |
| // should be interpreted as a required list of required |
| // elements where the element type is the type of the field. |
| _ if repetition == Repetition::REPEATED => { |
| let required_field = Type::group_type_builder(field.name()) |
| .with_repetition(Repetition::REQUIRED) |
| .with_converted_type(field.get_basic_info().converted_type()) |
| .with_fields(&mut Vec::from(field.get_fields())) |
| .build() |
| .unwrap(); |
| |
| path.pop(); |
| |
| let reader = self.reader_tree( |
| Arc::new(required_field), |
| &mut path, |
| curr_def_level, |
| curr_rep_level, |
| paths, |
| row_group_reader, |
| ); |
| |
| Reader::RepeatedReader( |
| field, |
| curr_def_level - 1, |
| curr_rep_level - 1, |
| Box::new(reader), |
| ) |
| } |
| // Group types (structs) |
| _ => { |
| let mut readers = Vec::new(); |
| for child in field.get_fields() { |
| let reader = self.reader_tree( |
| child.clone(), |
| &mut path, |
| curr_def_level, |
| curr_rep_level, |
| paths, |
| row_group_reader, |
| ); |
| readers.push(reader); |
| } |
| Reader::GroupReader(Some(field), curr_def_level, readers) |
| } |
| } |
| }; |
| path.pop(); |
| |
| Reader::option(repetition, curr_def_level, reader) |
| } |
| } |
| |
| /// Reader tree for record assembly |
| pub enum Reader { |
| // Primitive reader with type information and triplet iterator |
| PrimitiveReader(TypePtr, TripletIter), |
| // Optional reader with definition level of a parent and a reader |
| OptionReader(i16, Box<Reader>), |
| // Group (struct) reader with type information, definition level and list of child |
| // readers. When it represents message type, type information is None |
| GroupReader(Option<TypePtr>, i16, Vec<Reader>), |
| // Reader for repeated values, e.g. lists, contains type information, definition |
| // level, repetition level and a child reader |
| RepeatedReader(TypePtr, i16, i16, Box<Reader>), |
| // Reader of key-value pairs, e.g. maps, contains type information, definition |
| // level, repetition level, child reader for keys and child reader for values |
| KeyValueReader(TypePtr, i16, i16, Box<Reader>, Box<Reader>), |
| } |
| |
| impl Reader { |
| /// Wraps reader in option reader based on repetition. |
| fn option(repetition: Repetition, def_level: i16, reader: Reader) -> Self { |
| if repetition == Repetition::OPTIONAL { |
| Reader::OptionReader(def_level - 1, Box::new(reader)) |
| } else { |
| reader |
| } |
| } |
| |
| /// Returns true if repeated type is an element type for the list. |
| /// Used to determine legacy list types. |
| /// This method is copied from Spark Parquet reader and is based on the reference: |
| /// <https://github.com/apache/parquet-format/blob/master/LogicalTypes.md> |
| /// #backward-compatibility-rules |
| fn is_element_type(repeated_type: &Type) -> bool { |
| // For legacy 2-level list types with primitive element type, e.g.: |
| // |
| // // ARRAY<INT> (nullable list, non-null elements) |
| // optional group my_list (LIST) { |
| // repeated int32 element; |
| // } |
| // |
| repeated_type.is_primitive() || |
| // For legacy 2-level list types whose element type is a group type with 2 or more |
| // fields, e.g.: |
| // |
| // // ARRAY<STRUCT<str: STRING, num: INT>> (nullable list, non-null elements) |
| // optional group my_list (LIST) { |
| // repeated group element { |
| // required binary str (UTF8); |
| // required int32 num; |
| // }; |
| // } |
| // |
| repeated_type.is_group() && repeated_type.get_fields().len() > 1 || |
| // For legacy 2-level list types generated by parquet-avro (Parquet version < 1.6.0), |
| // e.g.: |
| // |
| // // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements) |
| // optional group my_list (LIST) { |
| // repeated group array { |
| // required binary str (UTF8); |
| // }; |
| // } |
| // |
| repeated_type.name() == "array" || |
| // For Parquet data generated by parquet-thrift, e.g.: |
| // |
| // // ARRAY<STRUCT<str: STRING>> (nullable list, non-null elements) |
| // optional group my_list (LIST) { |
| // repeated group my_list_tuple { |
| // required binary str (UTF8); |
| // }; |
| // } |
| // |
| repeated_type.name().ends_with("_tuple") |
| } |
| |
| /// Reads current record as `Row` from the reader tree. |
| /// Automatically advances all necessary readers. |
| /// This must be called on the root level reader (i.e., for Message type). |
| /// Otherwise, it will panic. |
| fn read(&mut self) -> Row { |
| match *self { |
| Reader::GroupReader(_, _, ref mut readers) => { |
| let mut fields = Vec::new(); |
| for reader in readers { |
| fields.push((String::from(reader.field_name()), reader.read_field())); |
| } |
| make_row(fields) |
| } |
| _ => panic!("Cannot call read() on {}", self), |
| } |
| } |
| |
| /// Reads current record as `Field` from the reader tree. |
| /// Automatically advances all necessary readers. |
| fn read_field(&mut self) -> Field { |
| match *self { |
| Reader::PrimitiveReader(_, ref mut column) => { |
| let value = column.current_value(); |
| column.read_next().unwrap(); |
| value |
| } |
| Reader::OptionReader(def_level, ref mut reader) => { |
| if reader.current_def_level() > def_level { |
| reader.read_field() |
| } else { |
| reader.advance_columns(); |
| Field::Null |
| } |
| } |
| Reader::GroupReader(_, def_level, ref mut readers) => { |
| let mut fields = Vec::new(); |
| for reader in readers { |
| if reader.repetition() != Repetition::OPTIONAL |
| || reader.current_def_level() > def_level |
| { |
| fields.push(( |
| String::from(reader.field_name()), |
| reader.read_field(), |
| )); |
| } else { |
| reader.advance_columns(); |
| fields.push((String::from(reader.field_name()), Field::Null)); |
| } |
| } |
| let row = make_row(fields); |
| Field::Group(row) |
| } |
| Reader::RepeatedReader(_, def_level, rep_level, ref mut reader) => { |
| let mut elements = Vec::new(); |
| loop { |
| if reader.current_def_level() > def_level { |
| elements.push(reader.read_field()); |
| } else { |
| reader.advance_columns(); |
| // If the current definition level is equal to the definition |
| // level of this repeated type, then the |
| // result is an empty list and the repetition level |
| // will always be <= rl. |
| break; |
| } |
| |
| // This covers case when we are out of repetition levels and should |
| // close the group, or there are no values left to |
| // buffer. |
| if !reader.has_next() || reader.current_rep_level() <= rep_level { |
| break; |
| } |
| } |
| Field::ListInternal(make_list(elements)) |
| } |
| Reader::KeyValueReader( |
| _, |
| def_level, |
| rep_level, |
| ref mut keys, |
| ref mut values, |
| ) => { |
| let mut pairs = Vec::new(); |
| loop { |
| if keys.current_def_level() > def_level { |
| pairs.push((keys.read_field(), values.read_field())); |
| } else { |
| keys.advance_columns(); |
| values.advance_columns(); |
| // If the current definition level is equal to the definition |
| // level of this repeated type, then the |
| // result is an empty list and the repetition level |
| // will always be <= rl. |
| break; |
| } |
| |
| // This covers case when we are out of repetition levels and should |
| // close the group, or there are no values left to |
| // buffer. |
| if !keys.has_next() || keys.current_rep_level() <= rep_level { |
| break; |
| } |
| } |
| |
| Field::MapInternal(make_map(pairs)) |
| } |
| } |
| } |
| |
| /// Returns field name for the current reader. |
| fn field_name(&self) -> &str { |
| match *self { |
| Reader::PrimitiveReader(ref field, _) => field.name(), |
| Reader::OptionReader(_, ref reader) => reader.field_name(), |
| Reader::GroupReader(ref opt, ..) => match opt { |
| Some(ref field) => field.name(), |
| None => panic!("Field is None for group reader"), |
| }, |
| Reader::RepeatedReader(ref field, ..) => field.name(), |
| Reader::KeyValueReader(ref field, ..) => field.name(), |
| } |
| } |
| |
| /// Returns repetition for the current reader. |
| fn repetition(&self) -> Repetition { |
| match *self { |
| Reader::PrimitiveReader(ref field, _) => field.get_basic_info().repetition(), |
| Reader::OptionReader(_, ref reader) => reader.repetition(), |
| Reader::GroupReader(ref opt, ..) => match opt { |
| Some(ref field) => field.get_basic_info().repetition(), |
| None => panic!("Field is None for group reader"), |
| }, |
| Reader::RepeatedReader(ref field, ..) => field.get_basic_info().repetition(), |
| Reader::KeyValueReader(ref field, ..) => field.get_basic_info().repetition(), |
| } |
| } |
| |
| /// Returns true, if current reader has more values, false otherwise. |
| /// Method does not advance internal iterator. |
| fn has_next(&self) -> bool { |
| match *self { |
| Reader::PrimitiveReader(_, ref column) => column.has_next(), |
| Reader::OptionReader(_, ref reader) => reader.has_next(), |
| Reader::GroupReader(_, _, ref readers) => readers.first().unwrap().has_next(), |
| Reader::RepeatedReader(_, _, _, ref reader) => reader.has_next(), |
| Reader::KeyValueReader(_, _, _, ref keys, _) => keys.has_next(), |
| } |
| } |
| |
| /// Returns current definition level, |
| /// Method does not advance internal iterator. |
| fn current_def_level(&self) -> i16 { |
| match *self { |
| Reader::PrimitiveReader(_, ref column) => column.current_def_level(), |
| Reader::OptionReader(_, ref reader) => reader.current_def_level(), |
| Reader::GroupReader(_, _, ref readers) => match readers.first() { |
| Some(reader) => reader.current_def_level(), |
| None => panic!("Current definition level: empty group reader"), |
| }, |
| Reader::RepeatedReader(_, _, _, ref reader) => reader.current_def_level(), |
| Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_def_level(), |
| } |
| } |
| |
| /// Returns current repetition level. |
| /// Method does not advance internal iterator. |
| fn current_rep_level(&self) -> i16 { |
| match *self { |
| Reader::PrimitiveReader(_, ref column) => column.current_rep_level(), |
| Reader::OptionReader(_, ref reader) => reader.current_rep_level(), |
| Reader::GroupReader(_, _, ref readers) => match readers.first() { |
| Some(reader) => reader.current_rep_level(), |
| None => panic!("Current repetition level: empty group reader"), |
| }, |
| Reader::RepeatedReader(_, _, _, ref reader) => reader.current_rep_level(), |
| Reader::KeyValueReader(_, _, _, ref keys, _) => keys.current_rep_level(), |
| } |
| } |
| |
| /// Advances leaf columns for the current reader. |
| fn advance_columns(&mut self) { |
| match *self { |
| Reader::PrimitiveReader(_, ref mut column) => { |
| column.read_next().unwrap(); |
| } |
| Reader::OptionReader(_, ref mut reader) => { |
| reader.advance_columns(); |
| } |
| Reader::GroupReader(_, _, ref mut readers) => { |
| for reader in readers { |
| reader.advance_columns(); |
| } |
| } |
| Reader::RepeatedReader(_, _, _, ref mut reader) => { |
| reader.advance_columns(); |
| } |
| Reader::KeyValueReader(_, _, _, ref mut keys, ref mut values) => { |
| keys.advance_columns(); |
| values.advance_columns(); |
| } |
| } |
| } |
| } |
| |
| impl fmt::Display for Reader { |
| fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
| let s = match self { |
| Reader::PrimitiveReader(..) => "PrimitiveReader", |
| Reader::OptionReader(..) => "OptionReader", |
| Reader::GroupReader(..) => "GroupReader", |
| Reader::RepeatedReader(..) => "RepeatedReader", |
| Reader::KeyValueReader(..) => "KeyValueReader", |
| }; |
| write!(f, "{}", s) |
| } |
| } |
| |
| // ---------------------------------------------------------------------- |
| // Row iterators |
| |
| /// The enum Either with variants That represet a reference and a box of |
| /// [`FileReader`](crate::file::reader::FileReader). |
| enum Either<'a> { |
| Left(&'a dyn FileReader), |
| Right(Box<dyn FileReader>), |
| } |
| |
| impl<'a> Either<'a> { |
| fn reader(&self) -> &dyn FileReader { |
| match *self { |
| Either::Left(r) => r, |
| Either::Right(ref r) => &**r, |
| } |
| } |
| } |
| |
| /// Iterator of [`Row`](crate::record::Row)s. |
| /// It is used either for a single row group to iterate over data in that row group, or |
| /// an entire file with auto buffering of all row groups. |
| pub struct RowIter<'a> { |
| descr: SchemaDescPtr, |
| tree_builder: TreeBuilder, |
| file_reader: Option<Either<'a>>, |
| current_row_group: usize, |
| num_row_groups: usize, |
| row_iter: Option<ReaderIter>, |
| } |
| |
| impl<'a> RowIter<'a> { |
| /// Creates a new iterator of [`Row`](crate::record::Row)s. |
| fn new( |
| file_reader: Option<Either<'a>>, |
| row_iter: Option<ReaderIter>, |
| descr: SchemaDescPtr, |
| ) -> Self { |
| let tree_builder = Self::tree_builder(); |
| let num_row_groups = match file_reader { |
| Some(ref r) => r.reader().num_row_groups(), |
| None => 0, |
| }; |
| |
| Self { |
| descr, |
| file_reader, |
| tree_builder, |
| num_row_groups, |
| row_iter, |
| current_row_group: 0, |
| } |
| } |
| |
| /// Creates iterator of [`Row`](crate::record::Row)s for all row groups in a |
| /// file. |
| pub fn from_file(proj: Option<Type>, reader: &'a dyn FileReader) -> Result<Self> { |
| let either = Either::Left(reader); |
| let descr = Self::get_proj_descr( |
| proj, |
| reader.metadata().file_metadata().schema_descr_ptr(), |
| )?; |
| |
| Ok(Self::new(Some(either), None, descr)) |
| } |
| |
| /// Creates iterator of [`Row`](crate::record::Row)s for a specific row group. |
| pub fn from_row_group( |
| proj: Option<Type>, |
| reader: &'a dyn RowGroupReader, |
| ) -> Result<Self> { |
| let descr = Self::get_proj_descr(proj, reader.metadata().schema_descr_ptr())?; |
| let tree_builder = Self::tree_builder(); |
| let row_iter = tree_builder.as_iter(descr.clone(), reader); |
| |
| // For row group we need to set `current_row_group` >= `num_row_groups`, because |
| // we only have one row group and can't buffer more. |
| Ok(Self::new(None, Some(row_iter), descr)) |
| } |
| |
| /// Creates a iterator of [`Row`](crate::record::Row)s from a |
| /// [`FileReader`](crate::file::reader::FileReader) using the full file schema. |
| pub fn from_file_into(reader: Box<dyn FileReader>) -> Self { |
| let either = Either::Right(reader); |
| let descr = either |
| .reader() |
| .metadata() |
| .file_metadata() |
| .schema_descr_ptr(); |
| |
| Self::new(Some(either), None, descr) |
| } |
| |
| /// Tries to create a iterator of [`Row`](crate::record::Row)s using projections. |
| /// Returns a error if a file reader is not the source of this iterator. |
| /// |
| /// The Projected schema can be a subset of or equal to the file schema, |
| /// when it is None, full file schema is assumed. |
| pub fn project(self, proj: Option<Type>) -> Result<Self> { |
| match self.file_reader { |
| Some(ref either) => { |
| let schema = either |
| .reader() |
| .metadata() |
| .file_metadata() |
| .schema_descr_ptr(); |
| let descr = Self::get_proj_descr(proj, schema)?; |
| |
| Ok(Self::new(self.file_reader, None, descr)) |
| } |
| None => Err(general_err!("File reader is required to use projections")), |
| } |
| } |
| |
| /// Helper method to get schema descriptor for projected schema. |
| /// If projection is None, then full schema is returned. |
| #[inline] |
| fn get_proj_descr( |
| proj: Option<Type>, |
| root_descr: SchemaDescPtr, |
| ) -> Result<SchemaDescPtr> { |
| match proj { |
| Some(projection) => { |
| // check if projection is part of file schema |
| let root_schema = root_descr.root_schema(); |
| if !root_schema.check_contains(&projection) { |
| return Err(general_err!("Root schema does not contain projection")); |
| } |
| Ok(Arc::new(SchemaDescriptor::new(Arc::new(projection)))) |
| } |
| None => Ok(root_descr), |
| } |
| } |
| |
| /// Returns common tree builder, so the same settings are applied to both iterators |
| /// from file reader and row group. |
| #[inline] |
| fn tree_builder() -> TreeBuilder { |
| TreeBuilder::new() |
| } |
| } |
| |
| impl<'a> Iterator for RowIter<'a> { |
| type Item = Row; |
| |
| fn next(&mut self) -> Option<Row> { |
| let mut row = None; |
| if let Some(ref mut iter) = self.row_iter { |
| row = iter.next(); |
| } |
| |
| while row.is_none() && self.current_row_group < self.num_row_groups { |
| // We do not expect any failures when accessing a row group, and file reader |
| // must be set for selecting next row group. |
| if let Some(ref either) = self.file_reader { |
| let file_reader = either.reader(); |
| let row_group_reader = &*file_reader |
| .get_row_group(self.current_row_group) |
| .expect("Row group is required to advance"); |
| |
| let mut iter = self |
| .tree_builder |
| .as_iter(self.descr.clone(), row_group_reader); |
| |
| row = iter.next(); |
| |
| self.current_row_group += 1; |
| self.row_iter = Some(iter); |
| } |
| } |
| |
| row |
| } |
| } |
| |
| /// Internal iterator of [`Row`](crate::record::Row)s for a reader. |
| pub struct ReaderIter { |
| root_reader: Reader, |
| records_left: usize, |
| } |
| |
| impl ReaderIter { |
| fn new(mut root_reader: Reader, num_records: usize) -> Self { |
| // Prepare root reader by advancing all column vectors |
| root_reader.advance_columns(); |
| Self { |
| root_reader, |
| records_left: num_records, |
| } |
| } |
| } |
| |
| impl Iterator for ReaderIter { |
| type Item = Row; |
| |
| fn next(&mut self) -> Option<Row> { |
| if self.records_left > 0 { |
| self.records_left -= 1; |
| Some(self.root_reader.read()) |
| } else { |
| None |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| use crate::errors::{ParquetError, Result}; |
| use crate::file::reader::{FileReader, SerializedFileReader}; |
| use crate::record::api::{Field, Row, RowAccessor, RowFormatter}; |
| use crate::schema::parser::parse_message_type; |
| use crate::util::test_common::{get_test_file, get_test_path}; |
| use std::convert::TryFrom; |
| |
| // Convenient macros to assemble row, list, map, and group. |
| |
| macro_rules! row { |
| () => { |
| { |
| let result = Vec::new(); |
| make_row(result) |
| } |
| }; |
| ( $( $e:expr ), + ) => { |
| { |
| let mut result = Vec::new(); |
| $( |
| result.push($e); |
| )* |
| make_row(result) |
| } |
| } |
| } |
| |
| macro_rules! list { |
| () => { |
| { |
| let result = Vec::new(); |
| Field::ListInternal(make_list(result)) |
| } |
| }; |
| ( $( $e:expr ), + ) => { |
| { |
| let mut result = Vec::new(); |
| $( |
| result.push($e); |
| )* |
| Field::ListInternal(make_list(result)) |
| } |
| } |
| } |
| |
| macro_rules! map { |
| () => { |
| { |
| let result = Vec::new(); |
| Field::MapInternal(make_map(result)) |
| } |
| }; |
| ( $( $e:expr ), + ) => { |
| { |
| let mut result = Vec::new(); |
| $( |
| result.push($e); |
| )* |
| Field::MapInternal(make_map(result)) |
| } |
| } |
| } |
| |
| macro_rules! group { |
| ( $( $e:expr ), * ) => { |
| { |
| Field::Group(row!($( $e ), *)) |
| } |
| } |
| } |
| |
| #[test] |
| fn test_file_reader_rows_nulls() { |
| let rows = test_file_reader_rows("nulls.snappy.parquet", None).unwrap(); |
| let expected_rows = vec![ |
| row![( |
| "b_struct".to_string(), |
| group![("b_c_int".to_string(), Field::Null)] |
| )], |
| row![( |
| "b_struct".to_string(), |
| group![("b_c_int".to_string(), Field::Null)] |
| )], |
| row![( |
| "b_struct".to_string(), |
| group![("b_c_int".to_string(), Field::Null)] |
| )], |
| row![( |
| "b_struct".to_string(), |
| group![("b_c_int".to_string(), Field::Null)] |
| )], |
| row![( |
| "b_struct".to_string(), |
| group![("b_c_int".to_string(), Field::Null)] |
| )], |
| row![( |
| "b_struct".to_string(), |
| group![("b_c_int".to_string(), Field::Null)] |
| )], |
| row![( |
| "b_struct".to_string(), |
| group![("b_c_int".to_string(), Field::Null)] |
| )], |
| row![( |
| "b_struct".to_string(), |
| group![("b_c_int".to_string(), Field::Null)] |
| )], |
| ]; |
| assert_eq!(rows, expected_rows); |
| } |
| |
| #[test] |
| fn test_file_reader_rows_nonnullable() { |
| let rows = test_file_reader_rows("nonnullable.impala.parquet", None).unwrap(); |
| let expected_rows = vec![row![ |
| ("ID".to_string(), Field::Long(8)), |
| ("Int_Array".to_string(), list![Field::Int(-1)]), |
| ( |
| "int_array_array".to_string(), |
| list![list![Field::Int(-1), Field::Int(-2)], list![]] |
| ), |
| ( |
| "Int_Map".to_string(), |
| map![(Field::Str("k1".to_string()), Field::Int(-1))] |
| ), |
| ( |
| "int_map_array".to_string(), |
| list![ |
| map![], |
| map![(Field::Str("k1".to_string()), Field::Int(1))], |
| map![], |
| map![] |
| ] |
| ), |
| ( |
| "nested_Struct".to_string(), |
| group![ |
| ("a".to_string(), Field::Int(-1)), |
| ("B".to_string(), list![Field::Int(-1)]), |
| ( |
| "c".to_string(), |
| group![( |
| "D".to_string(), |
| list![list![group![ |
| ("e".to_string(), Field::Int(-1)), |
| ("f".to_string(), Field::Str("nonnullable".to_string())) |
| ]]] |
| )] |
| ), |
| ("G".to_string(), map![]) |
| ] |
| ) |
| ]]; |
| assert_eq!(rows, expected_rows); |
| } |
| |
| #[test] |
| fn test_file_reader_rows_nullable() { |
| let rows = test_file_reader_rows("nullable.impala.parquet", None).unwrap(); |
| let expected_rows = vec![ |
| row![ |
| ("id".to_string(), Field::Long(1)), |
| ( |
| "int_array".to_string(), |
| list![Field::Int(1), Field::Int(2), Field::Int(3)] |
| ), |
| ( |
| "int_array_Array".to_string(), |
| list![ |
| list![Field::Int(1), Field::Int(2)], |
| list![Field::Int(3), Field::Int(4)] |
| ] |
| ), |
| ( |
| "int_map".to_string(), |
| map![ |
| (Field::Str("k1".to_string()), Field::Int(1)), |
| (Field::Str("k2".to_string()), Field::Int(100)) |
| ] |
| ), |
| ( |
| "int_Map_Array".to_string(), |
| list![map![(Field::Str("k1".to_string()), Field::Int(1))]] |
| ), |
| ( |
| "nested_struct".to_string(), |
| group![ |
| ("A".to_string(), Field::Int(1)), |
| ("b".to_string(), list![Field::Int(1)]), |
| ( |
| "C".to_string(), |
| group![( |
| "d".to_string(), |
| list![ |
| list![ |
| group![ |
| ("E".to_string(), Field::Int(10)), |
| ( |
| "F".to_string(), |
| Field::Str("aaa".to_string()) |
| ) |
| ], |
| group![ |
| ("E".to_string(), Field::Int(-10)), |
| ( |
| "F".to_string(), |
| Field::Str("bbb".to_string()) |
| ) |
| ] |
| ], |
| list![group![ |
| ("E".to_string(), Field::Int(11)), |
| ("F".to_string(), Field::Str("c".to_string())) |
| ]] |
| ] |
| )] |
| ), |
| ( |
| "g".to_string(), |
| map![( |
| Field::Str("foo".to_string()), |
| group![( |
| "H".to_string(), |
| group![("i".to_string(), list![Field::Double(1.1)])] |
| )] |
| )] |
| ) |
| ] |
| ) |
| ], |
| row![ |
| ("id".to_string(), Field::Long(2)), |
| ( |
| "int_array".to_string(), |
| list![ |
| Field::Null, |
| Field::Int(1), |
| Field::Int(2), |
| Field::Null, |
| Field::Int(3), |
| Field::Null |
| ] |
| ), |
| ( |
| "int_array_Array".to_string(), |
| list![ |
| list![Field::Null, Field::Int(1), Field::Int(2), Field::Null], |
| list![Field::Int(3), Field::Null, Field::Int(4)], |
| list![], |
| Field::Null |
| ] |
| ), |
| ( |
| "int_map".to_string(), |
| map![ |
| (Field::Str("k1".to_string()), Field::Int(2)), |
| (Field::Str("k2".to_string()), Field::Null) |
| ] |
| ), |
| ( |
| "int_Map_Array".to_string(), |
| list![ |
| map![ |
| (Field::Str("k3".to_string()), Field::Null), |
| (Field::Str("k1".to_string()), Field::Int(1)) |
| ], |
| Field::Null, |
| map![] |
| ] |
| ), |
| ( |
| "nested_struct".to_string(), |
| group![ |
| ("A".to_string(), Field::Null), |
| ("b".to_string(), list![Field::Null]), |
| ( |
| "C".to_string(), |
| group![( |
| "d".to_string(), |
| list![ |
| list![ |
| group![ |
| ("E".to_string(), Field::Null), |
| ("F".to_string(), Field::Null) |
| ], |
| group![ |
| ("E".to_string(), Field::Int(10)), |
| ( |
| "F".to_string(), |
| Field::Str("aaa".to_string()) |
| ) |
| ], |
| group![ |
| ("E".to_string(), Field::Null), |
| ("F".to_string(), Field::Null) |
| ], |
| group![ |
| ("E".to_string(), Field::Int(-10)), |
| ( |
| "F".to_string(), |
| Field::Str("bbb".to_string()) |
| ) |
| ], |
| group![ |
| ("E".to_string(), Field::Null), |
| ("F".to_string(), Field::Null) |
| ] |
| ], |
| list![ |
| group![ |
| ("E".to_string(), Field::Int(11)), |
| ( |
| "F".to_string(), |
| Field::Str("c".to_string()) |
| ) |
| ], |
| Field::Null |
| ], |
| list![], |
| Field::Null |
| ] |
| )] |
| ), |
| ( |
| "g".to_string(), |
| map![ |
| ( |
| Field::Str("g1".to_string()), |
| group![( |
| "H".to_string(), |
| group![( |
| "i".to_string(), |
| list![Field::Double(2.2), Field::Null] |
| )] |
| )] |
| ), |
| ( |
| Field::Str("g2".to_string()), |
| group![( |
| "H".to_string(), |
| group![("i".to_string(), list![])] |
| )] |
| ), |
| (Field::Str("g3".to_string()), Field::Null), |
| ( |
| Field::Str("g4".to_string()), |
| group![( |
| "H".to_string(), |
| group![("i".to_string(), Field::Null)] |
| )] |
| ), |
| ( |
| Field::Str("g5".to_string()), |
| group![("H".to_string(), Field::Null)] |
| ) |
| ] |
| ) |
| ] |
| ) |
| ], |
| row![ |
| ("id".to_string(), Field::Long(3)), |
| ("int_array".to_string(), list![]), |
| ("int_array_Array".to_string(), list![Field::Null]), |
| ("int_map".to_string(), map![]), |
| ("int_Map_Array".to_string(), list![Field::Null, Field::Null]), |
| ( |
| "nested_struct".to_string(), |
| group![ |
| ("A".to_string(), Field::Null), |
| ("b".to_string(), Field::Null), |
| ("C".to_string(), group![("d".to_string(), list![])]), |
| ("g".to_string(), map![]) |
| ] |
| ) |
| ], |
| row![ |
| ("id".to_string(), Field::Long(4)), |
| ("int_array".to_string(), Field::Null), |
| ("int_array_Array".to_string(), list![]), |
| ("int_map".to_string(), map![]), |
| ("int_Map_Array".to_string(), list![]), |
| ( |
| "nested_struct".to_string(), |
| group![ |
| ("A".to_string(), Field::Null), |
| ("b".to_string(), Field::Null), |
| ("C".to_string(), group![("d".to_string(), Field::Null)]), |
| ("g".to_string(), Field::Null) |
| ] |
| ) |
| ], |
| row![ |
| ("id".to_string(), Field::Long(5)), |
| ("int_array".to_string(), Field::Null), |
| ("int_array_Array".to_string(), Field::Null), |
| ("int_map".to_string(), map![]), |
| ("int_Map_Array".to_string(), Field::Null), |
| ( |
| "nested_struct".to_string(), |
| group![ |
| ("A".to_string(), Field::Null), |
| ("b".to_string(), Field::Null), |
| ("C".to_string(), Field::Null), |
| ( |
| "g".to_string(), |
| map![( |
| Field::Str("foo".to_string()), |
| group![( |
| "H".to_string(), |
| group![( |
| "i".to_string(), |
| list![Field::Double(2.2), Field::Double(3.3)] |
| )] |
| )] |
| )] |
| ) |
| ] |
| ) |
| ], |
| row![ |
| ("id".to_string(), Field::Long(6)), |
| ("int_array".to_string(), Field::Null), |
| ("int_array_Array".to_string(), Field::Null), |
| ("int_map".to_string(), Field::Null), |
| ("int_Map_Array".to_string(), Field::Null), |
| ("nested_struct".to_string(), Field::Null) |
| ], |
| row![ |
| ("id".to_string(), Field::Long(7)), |
| ("int_array".to_string(), Field::Null), |
| ( |
| "int_array_Array".to_string(), |
| list![Field::Null, list![Field::Int(5), Field::Int(6)]] |
| ), |
| ( |
| "int_map".to_string(), |
| map![ |
| (Field::Str("k1".to_string()), Field::Null), |
| (Field::Str("k3".to_string()), Field::Null) |
| ] |
| ), |
| ("int_Map_Array".to_string(), Field::Null), |
| ( |
| "nested_struct".to_string(), |
| group![ |
| ("A".to_string(), Field::Int(7)), |
| ( |
| "b".to_string(), |
| list![Field::Int(2), Field::Int(3), Field::Null] |
| ), |
| ( |
| "C".to_string(), |
| group![( |
| "d".to_string(), |
| list![list![], list![Field::Null], Field::Null] |
| )] |
| ), |
| ("g".to_string(), Field::Null) |
| ] |
| ) |
| ], |
| ]; |
| assert_eq!(rows, expected_rows); |
| } |
| |
| #[test] |
| fn test_file_reader_rows_projection() { |
| let schema = " |
| message spark_schema { |
| REQUIRED DOUBLE c; |
| REQUIRED INT32 b; |
| } |
| "; |
| let schema = parse_message_type(&schema).unwrap(); |
| let rows = |
| test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap(); |
| let expected_rows = vec![ |
| row![ |
| ("c".to_string(), Field::Double(1.0)), |
| ("b".to_string(), Field::Int(1)) |
| ], |
| row![ |
| ("c".to_string(), Field::Double(1.0)), |
| ("b".to_string(), Field::Int(1)) |
| ], |
| row![ |
| ("c".to_string(), Field::Double(1.0)), |
| ("b".to_string(), Field::Int(1)) |
| ], |
| row![ |
| ("c".to_string(), Field::Double(1.0)), |
| ("b".to_string(), Field::Int(1)) |
| ], |
| row![ |
| ("c".to_string(), Field::Double(1.0)), |
| ("b".to_string(), Field::Int(1)) |
| ], |
| row![ |
| ("c".to_string(), Field::Double(1.0)), |
| ("b".to_string(), Field::Int(1)) |
| ], |
| ]; |
| assert_eq!(rows, expected_rows); |
| } |
| |
| #[test] |
| fn test_iter_columns_in_row() { |
| let r = row![ |
| ("c".to_string(), Field::Double(1.0)), |
| ("b".to_string(), Field::Int(1)) |
| ]; |
| let mut result = Vec::new(); |
| for (name, record) in r.get_column_iter() { |
| result.push((name, record)); |
| } |
| assert_eq!( |
| vec![ |
| (&"c".to_string(), &Field::Double(1.0)), |
| (&"b".to_string(), &Field::Int(1)) |
| ], |
| result |
| ); |
| } |
| |
| #[test] |
| fn test_file_reader_rows_projection_map() { |
| let schema = " |
| message spark_schema { |
| OPTIONAL group a (MAP) { |
| REPEATED group key_value { |
| REQUIRED BYTE_ARRAY key (UTF8); |
| OPTIONAL group value (MAP) { |
| REPEATED group key_value { |
| REQUIRED INT32 key; |
| REQUIRED BOOLEAN value; |
| } |
| } |
| } |
| } |
| } |
| "; |
| let schema = parse_message_type(&schema).unwrap(); |
| let rows = |
| test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap(); |
| let expected_rows = vec![ |
| row![( |
| "a".to_string(), |
| map![( |
| Field::Str("a".to_string()), |
| map![ |
| (Field::Int(1), Field::Bool(true)), |
| (Field::Int(2), Field::Bool(false)) |
| ] |
| )] |
| )], |
| row![( |
| "a".to_string(), |
| map![( |
| Field::Str("b".to_string()), |
| map![(Field::Int(1), Field::Bool(true))] |
| )] |
| )], |
| row![( |
| "a".to_string(), |
| map![(Field::Str("c".to_string()), Field::Null)] |
| )], |
| row![("a".to_string(), map![(Field::Str("d".to_string()), map![])])], |
| row![( |
| "a".to_string(), |
| map![( |
| Field::Str("e".to_string()), |
| map![(Field::Int(1), Field::Bool(true))] |
| )] |
| )], |
| row![( |
| "a".to_string(), |
| map![( |
| Field::Str("f".to_string()), |
| map![ |
| (Field::Int(3), Field::Bool(true)), |
| (Field::Int(4), Field::Bool(false)), |
| (Field::Int(5), Field::Bool(true)) |
| ] |
| )] |
| )], |
| ]; |
| assert_eq!(rows, expected_rows); |
| } |
| |
| #[test] |
| fn test_file_reader_rows_projection_list() { |
| let schema = " |
| message spark_schema { |
| OPTIONAL group a (LIST) { |
| REPEATED group list { |
| OPTIONAL group element (LIST) { |
| REPEATED group list { |
| OPTIONAL group element (LIST) { |
| REPEATED group list { |
| OPTIONAL BYTE_ARRAY element (UTF8); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| "; |
| let schema = parse_message_type(&schema).unwrap(); |
| let rows = |
| test_file_reader_rows("nested_lists.snappy.parquet", Some(schema)).unwrap(); |
| let expected_rows = vec![ |
| row![( |
| "a".to_string(), |
| list![ |
| list![ |
| list![Field::Str("a".to_string()), Field::Str("b".to_string())], |
| list![Field::Str("c".to_string())] |
| ], |
| list![Field::Null, list![Field::Str("d".to_string())]] |
| ] |
| )], |
| row![( |
| "a".to_string(), |
| list![ |
| list![ |
| list![Field::Str("a".to_string()), Field::Str("b".to_string())], |
| list![Field::Str("c".to_string()), Field::Str("d".to_string())] |
| ], |
| list![Field::Null, list![Field::Str("e".to_string())]] |
| ] |
| )], |
| row![( |
| "a".to_string(), |
| list![ |
| list![ |
| list![Field::Str("a".to_string()), Field::Str("b".to_string())], |
| list![Field::Str("c".to_string()), Field::Str("d".to_string())], |
| list![Field::Str("e".to_string())] |
| ], |
| list![Field::Null, list![Field::Str("f".to_string())]] |
| ] |
| )], |
| ]; |
| assert_eq!(rows, expected_rows); |
| } |
| |
| #[test] |
| fn test_file_reader_rows_invalid_projection() { |
| let schema = " |
| message spark_schema { |
| REQUIRED INT32 key; |
| REQUIRED BOOLEAN value; |
| } |
| "; |
| let schema = parse_message_type(&schema).unwrap(); |
| let res = test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)); |
| assert!(res.is_err()); |
| assert_eq!( |
| res.unwrap_err(), |
| general_err!("Root schema does not contain projection") |
| ); |
| } |
| |
| #[test] |
| fn test_row_group_rows_invalid_projection() { |
| let schema = " |
| message spark_schema { |
| REQUIRED INT32 key; |
| REQUIRED BOOLEAN value; |
| } |
| "; |
| let schema = parse_message_type(&schema).unwrap(); |
| let res = test_row_group_rows("nested_maps.snappy.parquet", Some(schema)); |
| assert!(res.is_err()); |
| assert_eq!( |
| res.unwrap_err(), |
| general_err!("Root schema does not contain projection") |
| ); |
| } |
| |
| #[test] |
| #[should_panic(expected = "Invalid map type")] |
| fn test_file_reader_rows_invalid_map_type() { |
| let schema = " |
| message spark_schema { |
| OPTIONAL group a (MAP) { |
| REPEATED group key_value { |
| REQUIRED BYTE_ARRAY key (UTF8); |
| OPTIONAL group value (MAP) { |
| REPEATED group key_value { |
| REQUIRED INT32 key; |
| } |
| } |
| } |
| } |
| } |
| "; |
| let schema = parse_message_type(&schema).unwrap(); |
| test_file_reader_rows("nested_maps.snappy.parquet", Some(schema)).unwrap(); |
| } |
| |
| #[test] |
| fn test_file_reader_iter() { |
| let path = get_test_path("alltypes_plain.parquet"); |
| let vec = vec![path] |
| .iter() |
| .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) |
| .flat_map(|r| RowIter::from_file_into(Box::new(r))) |
| .flat_map(|r| r.get_int(0)) |
| .collect::<Vec<_>>(); |
| |
| assert_eq!(vec, vec![4, 5, 6, 7, 2, 3, 0, 1]); |
| } |
| |
| #[test] |
| fn test_file_reader_iter_projection() { |
| let path = get_test_path("alltypes_plain.parquet"); |
| let values = vec![path] |
| .iter() |
| .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) |
| .flat_map(|r| { |
| let schema = "message schema { OPTIONAL INT32 id; }"; |
| let proj = parse_message_type(&schema).ok(); |
| |
| RowIter::from_file_into(Box::new(r)).project(proj).unwrap() |
| }) |
| .map(|r| format!("id:{}", r.fmt(0))) |
| .collect::<Vec<_>>() |
| .join(", "); |
| |
| assert_eq!(values, "id:4, id:5, id:6, id:7, id:2, id:3, id:0, id:1"); |
| } |
| |
| #[test] |
| fn test_file_reader_iter_projection_err() { |
| let schema = " |
| message spark_schema { |
| REQUIRED INT32 key; |
| REQUIRED BOOLEAN value; |
| } |
| "; |
| let proj = parse_message_type(&schema).ok(); |
| let path = get_test_path("nested_maps.snappy.parquet"); |
| let reader = SerializedFileReader::try_from(path.as_path()).unwrap(); |
| let res = RowIter::from_file_into(Box::new(reader)).project(proj); |
| |
| assert!(res.is_err()); |
| assert_eq!( |
| res.err().unwrap(), |
| general_err!("Root schema does not contain projection") |
| ); |
| } |
| |
| #[test] |
| fn test_tree_reader_handle_repeated_fields_with_no_annotation() { |
| // Array field `phoneNumbers` does not contain LIST annotation. |
| // We parse it as struct with `phone` repeated field as array. |
| let rows = test_file_reader_rows("repeated_no_annotation.parquet", None).unwrap(); |
| let expected_rows = vec![ |
| row![ |
| ("id".to_string(), Field::Int(1)), |
| ("phoneNumbers".to_string(), Field::Null) |
| ], |
| row![ |
| ("id".to_string(), Field::Int(2)), |
| ("phoneNumbers".to_string(), Field::Null) |
| ], |
| row![ |
| ("id".to_string(), Field::Int(3)), |
| ( |
| "phoneNumbers".to_string(), |
| group![("phone".to_string(), list![])] |
| ) |
| ], |
| row![ |
| ("id".to_string(), Field::Int(4)), |
| ( |
| "phoneNumbers".to_string(), |
| group![( |
| "phone".to_string(), |
| list![group![ |
| ("number".to_string(), Field::Long(5555555555)), |
| ("kind".to_string(), Field::Null) |
| ]] |
| )] |
| ) |
| ], |
| row![ |
| ("id".to_string(), Field::Int(5)), |
| ( |
| "phoneNumbers".to_string(), |
| group![( |
| "phone".to_string(), |
| list![group![ |
| ("number".to_string(), Field::Long(1111111111)), |
| ("kind".to_string(), Field::Str("home".to_string())) |
| ]] |
| )] |
| ) |
| ], |
| row![ |
| ("id".to_string(), Field::Int(6)), |
| ( |
| "phoneNumbers".to_string(), |
| group![( |
| "phone".to_string(), |
| list![ |
| group![ |
| ("number".to_string(), Field::Long(1111111111)), |
| ("kind".to_string(), Field::Str("home".to_string())) |
| ], |
| group![ |
| ("number".to_string(), Field::Long(2222222222)), |
| ("kind".to_string(), Field::Null) |
| ], |
| group![ |
| ("number".to_string(), Field::Long(3333333333)), |
| ("kind".to_string(), Field::Str("mobile".to_string())) |
| ] |
| ] |
| )] |
| ) |
| ], |
| ]; |
| |
| assert_eq!(rows, expected_rows); |
| } |
| |
| fn test_file_reader_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> { |
| let file = get_test_file(file_name); |
| let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?); |
| let iter = file_reader.get_row_iter(schema)?; |
| Ok(iter.collect()) |
| } |
| |
| fn test_row_group_rows(file_name: &str, schema: Option<Type>) -> Result<Vec<Row>> { |
| let file = get_test_file(file_name); |
| let file_reader: Box<dyn FileReader> = Box::new(SerializedFileReader::new(file)?); |
| // Check the first row group only, because files will contain only single row |
| // group |
| let row_group_reader = file_reader.get_row_group(0).unwrap(); |
| let iter = row_group_reader.get_row_iter(schema)?; |
| Ok(iter.collect()) |
| } |
| } |