| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use crate::basic::Type as PhysicalType; |
| use crate::column::reader::{get_typed_column_reader, ColumnReader, ColumnReaderImpl}; |
| use crate::data_type::*; |
| use crate::errors::{ParquetError, Result}; |
| use crate::record::api::Field; |
| use crate::schema::types::ColumnDescPtr; |
| |
| /// Macro to generate simple functions that cover all types of triplet iterator. |
| /// $func is a function of a typed triplet iterator and $token is a either {`ref`} or |
| /// {`ref`, `mut`} |
| macro_rules! triplet_enum_func { |
| ($self:ident, $func:ident, $( $token:tt ),*) => ({ |
| match *$self { |
| TripletIter::BoolTripletIter($($token)* typed) => typed.$func(), |
| TripletIter::Int32TripletIter($($token)* typed) => typed.$func(), |
| TripletIter::Int64TripletIter($($token)* typed) => typed.$func(), |
| TripletIter::Int96TripletIter($($token)* typed) => typed.$func(), |
| TripletIter::FloatTripletIter($($token)* typed) => typed.$func(), |
| TripletIter::DoubleTripletIter($($token)* typed) => typed.$func(), |
| TripletIter::ByteArrayTripletIter($($token)* typed) => typed.$func(), |
| TripletIter::FixedLenByteArrayTripletIter($($token)* typed) => typed.$func() |
| } |
| }); |
| } |
| |
| /// High level API wrapper on column reader. |
| /// Provides per-element access for each primitive column. |
| pub enum TripletIter { |
| BoolTripletIter(TypedTripletIter<BoolType>), |
| Int32TripletIter(TypedTripletIter<Int32Type>), |
| Int64TripletIter(TypedTripletIter<Int64Type>), |
| Int96TripletIter(TypedTripletIter<Int96Type>), |
| FloatTripletIter(TypedTripletIter<FloatType>), |
| DoubleTripletIter(TypedTripletIter<DoubleType>), |
| ByteArrayTripletIter(TypedTripletIter<ByteArrayType>), |
| FixedLenByteArrayTripletIter(TypedTripletIter<FixedLenByteArrayType>), |
| } |
| |
| impl TripletIter { |
| /// Creates new triplet for column reader |
| pub fn new(descr: ColumnDescPtr, reader: ColumnReader, batch_size: usize) -> Self { |
| match descr.physical_type() { |
| PhysicalType::BOOLEAN => TripletIter::BoolTripletIter(TypedTripletIter::new( |
| descr, batch_size, reader, |
| )), |
| PhysicalType::INT32 => TripletIter::Int32TripletIter(TypedTripletIter::new( |
| descr, batch_size, reader, |
| )), |
| PhysicalType::INT64 => TripletIter::Int64TripletIter(TypedTripletIter::new( |
| descr, batch_size, reader, |
| )), |
| PhysicalType::INT96 => TripletIter::Int96TripletIter(TypedTripletIter::new( |
| descr, batch_size, reader, |
| )), |
| PhysicalType::FLOAT => TripletIter::FloatTripletIter(TypedTripletIter::new( |
| descr, batch_size, reader, |
| )), |
| PhysicalType::DOUBLE => TripletIter::DoubleTripletIter( |
| TypedTripletIter::new(descr, batch_size, reader), |
| ), |
| PhysicalType::BYTE_ARRAY => TripletIter::ByteArrayTripletIter( |
| TypedTripletIter::new(descr, batch_size, reader), |
| ), |
| PhysicalType::FIXED_LEN_BYTE_ARRAY => { |
| TripletIter::FixedLenByteArrayTripletIter(TypedTripletIter::new( |
| descr, batch_size, reader, |
| )) |
| } |
| } |
| } |
| |
| /// Invokes underlying typed triplet iterator to buffer current value. |
| /// Should be called once - either before `is_null` or `current_value`. |
| #[inline] |
| pub fn read_next(&mut self) -> Result<bool> { |
| triplet_enum_func!(self, read_next, ref, mut) |
| } |
| |
| /// Provides check on values/levels left without invoking the underlying typed triplet |
| /// iterator. |
| /// Returns true if more values/levels exist, false otherwise. |
| /// It is always in sync with `read_next` method. |
| #[inline] |
| pub fn has_next(&self) -> bool { |
| triplet_enum_func!(self, has_next, ref) |
| } |
| |
| /// Returns current definition level for a leaf triplet iterator |
| #[inline] |
| pub fn current_def_level(&self) -> i16 { |
| triplet_enum_func!(self, current_def_level, ref) |
| } |
| |
| /// Returns max definition level for a leaf triplet iterator |
| #[inline] |
| pub fn max_def_level(&self) -> i16 { |
| triplet_enum_func!(self, max_def_level, ref) |
| } |
| |
| /// Returns current repetition level for a leaf triplet iterator |
| #[inline] |
| pub fn current_rep_level(&self) -> i16 { |
| triplet_enum_func!(self, current_rep_level, ref) |
| } |
| |
| /// Returns max repetition level for a leaf triplet iterator |
| #[inline] |
| pub fn max_rep_level(&self) -> i16 { |
| triplet_enum_func!(self, max_rep_level, ref) |
| } |
| |
| /// Returns true, if current value is null. |
| /// Based on the fact that for non-null value current definition level |
| /// equals to max definition level. |
| #[inline] |
| pub fn is_null(&self) -> bool { |
| self.current_def_level() < self.max_def_level() |
| } |
| |
| /// Updates non-null value for current row. |
| pub fn current_value(&self) -> Field { |
| assert!(!self.is_null(), "Value is null"); |
| match *self { |
| TripletIter::BoolTripletIter(ref typed) => { |
| Field::convert_bool(typed.column_descr(), *typed.current_value()) |
| } |
| TripletIter::Int32TripletIter(ref typed) => { |
| Field::convert_int32(typed.column_descr(), *typed.current_value()) |
| } |
| TripletIter::Int64TripletIter(ref typed) => { |
| Field::convert_int64(typed.column_descr(), *typed.current_value()) |
| } |
| TripletIter::Int96TripletIter(ref typed) => { |
| Field::convert_int96(typed.column_descr(), typed.current_value().clone()) |
| } |
| TripletIter::FloatTripletIter(ref typed) => { |
| Field::convert_float(typed.column_descr(), *typed.current_value()) |
| } |
| TripletIter::DoubleTripletIter(ref typed) => { |
| Field::convert_double(typed.column_descr(), *typed.current_value()) |
| } |
| TripletIter::ByteArrayTripletIter(ref typed) => Field::convert_byte_array( |
| typed.column_descr(), |
| typed.current_value().clone(), |
| ), |
| TripletIter::FixedLenByteArrayTripletIter(ref typed) => { |
| Field::convert_byte_array( |
| typed.column_descr(), |
| typed.current_value().clone().into(), |
| ) |
| } |
| } |
| } |
| } |
| |
| /// Internal typed triplet iterator as a wrapper for column reader |
| /// (primitive leaf column), provides per-element access. |
| pub struct TypedTripletIter<T: DataType> { |
| reader: ColumnReaderImpl<T>, |
| column_descr: ColumnDescPtr, |
| batch_size: usize, |
| // type properties |
| max_def_level: i16, |
| max_rep_level: i16, |
| // values and levels |
| values: Vec<T::T>, |
| def_levels: Option<Vec<i16>>, |
| rep_levels: Option<Vec<i16>>, |
| // current index for the triplet (value, def, rep) |
| curr_triplet_index: usize, |
| // how many triplets are left before we need to buffer |
| triplets_left: usize, |
| // helper flag to quickly check if we have more values/levels to read |
| has_next: bool, |
| } |
| |
| impl<T: DataType> TypedTripletIter<T> { |
| /// Creates new typed triplet iterator based on provided column reader. |
| /// Use batch size to specify the amount of values to buffer from column reader. |
| fn new(descr: ColumnDescPtr, batch_size: usize, column_reader: ColumnReader) -> Self { |
| assert!( |
| batch_size > 0, |
| "Expected positive batch size, found: {}", |
| batch_size |
| ); |
| |
| let max_def_level = descr.max_def_level(); |
| let max_rep_level = descr.max_rep_level(); |
| |
| let def_levels = if max_def_level == 0 { |
| None |
| } else { |
| Some(vec![0; batch_size]) |
| }; |
| let rep_levels = if max_rep_level == 0 { |
| None |
| } else { |
| Some(vec![0; batch_size]) |
| }; |
| |
| Self { |
| reader: get_typed_column_reader(column_reader), |
| column_descr: descr, |
| batch_size, |
| max_def_level, |
| max_rep_level, |
| values: vec![T::T::default(); batch_size], |
| def_levels, |
| rep_levels, |
| curr_triplet_index: 0, |
| triplets_left: 0, |
| has_next: false, |
| } |
| } |
| |
| /// Returns column descriptor reference for the current typed triplet iterator. |
| #[inline] |
| pub fn column_descr(&self) -> &ColumnDescPtr { |
| &self.column_descr |
| } |
| |
| /// Returns maximum definition level for the triplet iterator (leaf column). |
| #[inline] |
| fn max_def_level(&self) -> i16 { |
| self.max_def_level |
| } |
| |
| /// Returns maximum repetition level for the triplet iterator (leaf column). |
| #[inline] |
| fn max_rep_level(&self) -> i16 { |
| self.max_rep_level |
| } |
| |
| /// Returns current value. |
| /// Method does not advance the iterator, therefore can be called multiple times. |
| #[inline] |
| fn current_value(&self) -> &T::T { |
| assert!( |
| self.current_def_level() == self.max_def_level(), |
| "Cannot extract value, max definition level: {}, current level: {}", |
| self.max_def_level(), |
| self.current_def_level() |
| ); |
| &self.values[self.curr_triplet_index] |
| } |
| |
| /// Returns current definition level. |
| /// If field is required, then maximum definition level is returned. |
| #[inline] |
| fn current_def_level(&self) -> i16 { |
| match self.def_levels { |
| Some(ref vec) => vec[self.curr_triplet_index], |
| None => self.max_def_level, |
| } |
| } |
| |
| /// Returns current repetition level. |
| /// If field is required, then maximum repetition level is returned. |
| #[inline] |
| fn current_rep_level(&self) -> i16 { |
| match self.rep_levels { |
| Some(ref vec) => vec[self.curr_triplet_index], |
| None => self.max_rep_level, |
| } |
| } |
| |
| /// Quick check if iterator has more values/levels to read. |
| /// It is updated as a result of `read_next` method, so they are synchronized. |
| #[inline] |
| fn has_next(&self) -> bool { |
| self.has_next |
| } |
| |
| /// Advances to the next triplet. |
| /// Returns true, if there are more records to read, false there are no records left. |
| fn read_next(&mut self) -> Result<bool> { |
| self.curr_triplet_index += 1; |
| |
| if self.curr_triplet_index >= self.triplets_left { |
| let (values_read, levels_read) = { |
| // Get slice of definition levels, if available |
| let def_levels = self.def_levels.as_mut().map(|vec| &mut vec[..]); |
| |
| // Get slice of repetition levels, if available |
| let rep_levels = self.rep_levels.as_mut().map(|vec| &mut vec[..]); |
| |
| // Buffer triplets |
| self.reader.read_batch( |
| self.batch_size, |
| def_levels, |
| rep_levels, |
| &mut self.values, |
| )? |
| }; |
| |
| // No more values or levels to read |
| if values_read == 0 && levels_read == 0 { |
| self.has_next = false; |
| return Ok(false); |
| } |
| |
| // We never read values more than levels |
| if levels_read == 0 || values_read == levels_read { |
| // There are no definition levels to read, column is required |
| // or definition levels match values, so it does not require spacing |
| self.curr_triplet_index = 0; |
| self.triplets_left = values_read; |
| } else if values_read < levels_read { |
| // Add spacing for triplets. |
| // The idea is setting values for positions in def_levels when current |
| // definition level equals to maximum definition level. |
| // Values and levels are guaranteed to line up, because of |
| // the column reader method. |
| |
| // Note: if values_read == 0, then spacing will not be triggered |
| let mut idx = values_read; |
| let def_levels = self.def_levels.as_ref().unwrap(); |
| for i in 0..levels_read { |
| if def_levels[levels_read - i - 1] == self.max_def_level { |
| idx -= 1; // This is done to avoid usize becoming a negative value |
| self.values.swap(levels_read - i - 1, idx); |
| } |
| } |
| self.curr_triplet_index = 0; |
| self.triplets_left = levels_read; |
| } else { |
| return Err(general_err!( |
| "Spacing of values/levels is wrong, values_read: {}, levels_read: {}", |
| values_read, |
| levels_read |
| )); |
| } |
| } |
| |
| self.has_next = true; |
| Ok(true) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| use crate::file::reader::{FileReader, SerializedFileReader}; |
| use crate::schema::types::ColumnPath; |
| use crate::util::test_common::get_test_file; |
| |
| #[test] |
| #[should_panic(expected = "Expected positive batch size, found: 0")] |
| fn test_triplet_zero_batch_size() { |
| let column_path = |
| ColumnPath::from(vec!["b_struct".to_string(), "b_c_int".to_string()]); |
| test_column_in_file("nulls.snappy.parquet", 0, &column_path, &[], &[], &[]); |
| } |
| |
| #[test] |
| fn test_triplet_null_column() { |
| let path = vec!["b_struct", "b_c_int"]; |
| let values = vec![]; |
| let def_levels = vec![1, 1, 1, 1, 1, 1, 1, 1]; |
| let rep_levels = vec![0, 0, 0, 0, 0, 0, 0, 0]; |
| test_triplet_iter( |
| "nulls.snappy.parquet", |
| path, |
| &values, |
| &def_levels, |
| &rep_levels, |
| ); |
| } |
| |
| #[test] |
| fn test_triplet_required_column() { |
| let path = vec!["ID"]; |
| let values = vec![Field::Long(8)]; |
| let def_levels = vec![0]; |
| let rep_levels = vec![0]; |
| test_triplet_iter( |
| "nonnullable.impala.parquet", |
| path, |
| &values, |
| &def_levels, |
| &rep_levels, |
| ); |
| } |
| |
| #[test] |
| fn test_triplet_optional_column() { |
| let path = vec!["nested_struct", "A"]; |
| let values = vec![Field::Int(1), Field::Int(7)]; |
| let def_levels = vec![2, 1, 1, 1, 1, 0, 2]; |
| let rep_levels = vec![0, 0, 0, 0, 0, 0, 0]; |
| test_triplet_iter( |
| "nullable.impala.parquet", |
| path, |
| &values, |
| &def_levels, |
| &rep_levels, |
| ); |
| } |
| |
| #[test] |
| fn test_triplet_optional_list_column() { |
| let path = vec!["a", "list", "element", "list", "element", "list", "element"]; |
| let values = vec![ |
| Field::Str("a".to_string()), |
| Field::Str("b".to_string()), |
| Field::Str("c".to_string()), |
| Field::Str("d".to_string()), |
| Field::Str("a".to_string()), |
| Field::Str("b".to_string()), |
| Field::Str("c".to_string()), |
| Field::Str("d".to_string()), |
| Field::Str("e".to_string()), |
| Field::Str("a".to_string()), |
| Field::Str("b".to_string()), |
| Field::Str("c".to_string()), |
| Field::Str("d".to_string()), |
| Field::Str("e".to_string()), |
| Field::Str("f".to_string()), |
| ]; |
| let def_levels = vec![7, 7, 7, 4, 7, 7, 7, 7, 7, 4, 7, 7, 7, 7, 7, 7, 4, 7]; |
| let rep_levels = vec![0, 3, 2, 1, 2, 0, 3, 2, 3, 1, 2, 0, 3, 2, 3, 2, 1, 2]; |
| test_triplet_iter( |
| "nested_lists.snappy.parquet", |
| path, |
| &values, |
| &def_levels, |
| &rep_levels, |
| ); |
| } |
| |
| #[test] |
| fn test_triplet_optional_map_column() { |
| let path = vec!["a", "key_value", "value", "key_value", "key"]; |
| let values = vec![ |
| Field::Int(1), |
| Field::Int(2), |
| Field::Int(1), |
| Field::Int(1), |
| Field::Int(3), |
| Field::Int(4), |
| Field::Int(5), |
| ]; |
| let def_levels = vec![4, 4, 4, 2, 3, 4, 4, 4, 4]; |
| let rep_levels = vec![0, 2, 0, 0, 0, 0, 0, 2, 2]; |
| test_triplet_iter( |
| "nested_maps.snappy.parquet", |
| path, |
| &values, |
| &def_levels, |
| &rep_levels, |
| ); |
| } |
| |
| // Check triplet iterator across different batch sizes |
| fn test_triplet_iter( |
| file_name: &str, |
| column_path: Vec<&str>, |
| expected_values: &[Field], |
| expected_def_levels: &[i16], |
| expected_rep_levels: &[i16], |
| ) { |
| // Convert path into column path |
| let path: Vec<String> = column_path.iter().map(|x| x.to_string()).collect(); |
| let column_path = ColumnPath::from(path); |
| |
| let batch_sizes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 128, 256]; |
| for batch_size in batch_sizes { |
| test_column_in_file( |
| file_name, |
| batch_size, |
| &column_path, |
| expected_values, |
| expected_def_levels, |
| expected_rep_levels, |
| ); |
| } |
| } |
| |
| // Check values of a selectd column in a file |
| fn test_column_in_file( |
| file_name: &str, |
| batch_size: usize, |
| column_path: &ColumnPath, |
| expected_values: &[Field], |
| expected_def_levels: &[i16], |
| expected_rep_levels: &[i16], |
| ) { |
| let file = get_test_file(file_name); |
| let file_reader = SerializedFileReader::new(file).unwrap(); |
| let metadata = file_reader.metadata(); |
| // Get schema descriptor |
| let file_metadata = metadata.file_metadata(); |
| let schema = file_metadata.schema_descr(); |
| // Get first row group |
| let row_group_reader = file_reader.get_row_group(0).unwrap(); |
| |
| for i in 0..schema.num_columns() { |
| let descr = schema.column(i); |
| if descr.path() == column_path { |
| let reader = row_group_reader.get_column_reader(i).unwrap(); |
| test_triplet_column( |
| descr, |
| reader, |
| batch_size, |
| expected_values, |
| expected_def_levels, |
| expected_rep_levels, |
| ); |
| } |
| } |
| } |
| |
| // Check values for individual triplet iterator |
| fn test_triplet_column( |
| descr: ColumnDescPtr, |
| reader: ColumnReader, |
| batch_size: usize, |
| expected_values: &[Field], |
| expected_def_levels: &[i16], |
| expected_rep_levels: &[i16], |
| ) { |
| let mut iter = TripletIter::new(descr.clone(), reader, batch_size); |
| let mut values: Vec<Field> = Vec::new(); |
| let mut def_levels: Vec<i16> = Vec::new(); |
| let mut rep_levels: Vec<i16> = Vec::new(); |
| |
| assert_eq!(iter.max_def_level(), descr.max_def_level()); |
| assert_eq!(iter.max_rep_level(), descr.max_rep_level()); |
| |
| while let Ok(true) = iter.read_next() { |
| assert!(iter.has_next()); |
| if !iter.is_null() { |
| values.push(iter.current_value()); |
| } |
| def_levels.push(iter.current_def_level()); |
| rep_levels.push(iter.current_rep_level()); |
| } |
| |
| assert_eq!(values, expected_values); |
| assert_eq!(def_levels, expected_def_levels); |
| assert_eq!(rep_levels, expected_rep_levels); |
| } |
| } |