blob: bb4f942fd18a593eccf089b1448dd01283611147 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use crate::basic::Type as PhysicalType;
use crate::column::reader::{get_typed_column_reader, ColumnReader, ColumnReaderImpl};
use crate::data_type::*;
use crate::errors::{ParquetError, Result};
use crate::record::api::Field;
use crate::schema::types::ColumnDescPtr;
/// Macro to generate simple functions that cover all types of triplet iterator.
/// $func is a function of a typed triplet iterator and $token is a either {`ref`} or
/// {`ref`, `mut`}
macro_rules! triplet_enum_func {
($self:ident, $func:ident, $( $token:tt ),*) => ({
match *$self {
TripletIter::BoolTripletIter($($token)* typed) => typed.$func(),
TripletIter::Int32TripletIter($($token)* typed) => typed.$func(),
TripletIter::Int64TripletIter($($token)* typed) => typed.$func(),
TripletIter::Int96TripletIter($($token)* typed) => typed.$func(),
TripletIter::FloatTripletIter($($token)* typed) => typed.$func(),
TripletIter::DoubleTripletIter($($token)* typed) => typed.$func(),
TripletIter::ByteArrayTripletIter($($token)* typed) => typed.$func(),
TripletIter::FixedLenByteArrayTripletIter($($token)* typed) => typed.$func()
}
});
}
/// High level API wrapper on column reader.
/// Provides per-element access for each primitive column.
pub enum TripletIter {
BoolTripletIter(TypedTripletIter<BoolType>),
Int32TripletIter(TypedTripletIter<Int32Type>),
Int64TripletIter(TypedTripletIter<Int64Type>),
Int96TripletIter(TypedTripletIter<Int96Type>),
FloatTripletIter(TypedTripletIter<FloatType>),
DoubleTripletIter(TypedTripletIter<DoubleType>),
ByteArrayTripletIter(TypedTripletIter<ByteArrayType>),
FixedLenByteArrayTripletIter(TypedTripletIter<FixedLenByteArrayType>),
}
impl TripletIter {
/// Creates new triplet for column reader
pub fn new(descr: ColumnDescPtr, reader: ColumnReader, batch_size: usize) -> Self {
match descr.physical_type() {
PhysicalType::BOOLEAN => TripletIter::BoolTripletIter(TypedTripletIter::new(
descr, batch_size, reader,
)),
PhysicalType::INT32 => TripletIter::Int32TripletIter(TypedTripletIter::new(
descr, batch_size, reader,
)),
PhysicalType::INT64 => TripletIter::Int64TripletIter(TypedTripletIter::new(
descr, batch_size, reader,
)),
PhysicalType::INT96 => TripletIter::Int96TripletIter(TypedTripletIter::new(
descr, batch_size, reader,
)),
PhysicalType::FLOAT => TripletIter::FloatTripletIter(TypedTripletIter::new(
descr, batch_size, reader,
)),
PhysicalType::DOUBLE => TripletIter::DoubleTripletIter(
TypedTripletIter::new(descr, batch_size, reader),
),
PhysicalType::BYTE_ARRAY => TripletIter::ByteArrayTripletIter(
TypedTripletIter::new(descr, batch_size, reader),
),
PhysicalType::FIXED_LEN_BYTE_ARRAY => {
TripletIter::FixedLenByteArrayTripletIter(TypedTripletIter::new(
descr, batch_size, reader,
))
}
}
}
/// Invokes underlying typed triplet iterator to buffer current value.
/// Should be called once - either before `is_null` or `current_value`.
#[inline]
pub fn read_next(&mut self) -> Result<bool> {
triplet_enum_func!(self, read_next, ref, mut)
}
/// Provides check on values/levels left without invoking the underlying typed triplet
/// iterator.
/// Returns true if more values/levels exist, false otherwise.
/// It is always in sync with `read_next` method.
#[inline]
pub fn has_next(&self) -> bool {
triplet_enum_func!(self, has_next, ref)
}
/// Returns current definition level for a leaf triplet iterator
#[inline]
pub fn current_def_level(&self) -> i16 {
triplet_enum_func!(self, current_def_level, ref)
}
/// Returns max definition level for a leaf triplet iterator
#[inline]
pub fn max_def_level(&self) -> i16 {
triplet_enum_func!(self, max_def_level, ref)
}
/// Returns current repetition level for a leaf triplet iterator
#[inline]
pub fn current_rep_level(&self) -> i16 {
triplet_enum_func!(self, current_rep_level, ref)
}
/// Returns max repetition level for a leaf triplet iterator
#[inline]
pub fn max_rep_level(&self) -> i16 {
triplet_enum_func!(self, max_rep_level, ref)
}
/// Returns true, if current value is null.
/// Based on the fact that for non-null value current definition level
/// equals to max definition level.
#[inline]
pub fn is_null(&self) -> bool {
self.current_def_level() < self.max_def_level()
}
/// Updates non-null value for current row.
pub fn current_value(&self) -> Field {
assert!(!self.is_null(), "Value is null");
match *self {
TripletIter::BoolTripletIter(ref typed) => {
Field::convert_bool(typed.column_descr(), *typed.current_value())
}
TripletIter::Int32TripletIter(ref typed) => {
Field::convert_int32(typed.column_descr(), *typed.current_value())
}
TripletIter::Int64TripletIter(ref typed) => {
Field::convert_int64(typed.column_descr(), *typed.current_value())
}
TripletIter::Int96TripletIter(ref typed) => {
Field::convert_int96(typed.column_descr(), typed.current_value().clone())
}
TripletIter::FloatTripletIter(ref typed) => {
Field::convert_float(typed.column_descr(), *typed.current_value())
}
TripletIter::DoubleTripletIter(ref typed) => {
Field::convert_double(typed.column_descr(), *typed.current_value())
}
TripletIter::ByteArrayTripletIter(ref typed) => Field::convert_byte_array(
typed.column_descr(),
typed.current_value().clone(),
),
TripletIter::FixedLenByteArrayTripletIter(ref typed) => {
Field::convert_byte_array(
typed.column_descr(),
typed.current_value().clone().into(),
)
}
}
}
}
/// Internal typed triplet iterator as a wrapper for column reader
/// (primitive leaf column), provides per-element access.
pub struct TypedTripletIter<T: DataType> {
reader: ColumnReaderImpl<T>,
column_descr: ColumnDescPtr,
batch_size: usize,
// type properties
max_def_level: i16,
max_rep_level: i16,
// values and levels
values: Vec<T::T>,
def_levels: Option<Vec<i16>>,
rep_levels: Option<Vec<i16>>,
// current index for the triplet (value, def, rep)
curr_triplet_index: usize,
// how many triplets are left before we need to buffer
triplets_left: usize,
// helper flag to quickly check if we have more values/levels to read
has_next: bool,
}
impl<T: DataType> TypedTripletIter<T> {
/// Creates new typed triplet iterator based on provided column reader.
/// Use batch size to specify the amount of values to buffer from column reader.
fn new(descr: ColumnDescPtr, batch_size: usize, column_reader: ColumnReader) -> Self {
assert!(
batch_size > 0,
"Expected positive batch size, found: {}",
batch_size
);
let max_def_level = descr.max_def_level();
let max_rep_level = descr.max_rep_level();
let def_levels = if max_def_level == 0 {
None
} else {
Some(vec![0; batch_size])
};
let rep_levels = if max_rep_level == 0 {
None
} else {
Some(vec![0; batch_size])
};
Self {
reader: get_typed_column_reader(column_reader),
column_descr: descr,
batch_size,
max_def_level,
max_rep_level,
values: vec![T::T::default(); batch_size],
def_levels,
rep_levels,
curr_triplet_index: 0,
triplets_left: 0,
has_next: false,
}
}
/// Returns column descriptor reference for the current typed triplet iterator.
#[inline]
pub fn column_descr(&self) -> &ColumnDescPtr {
&self.column_descr
}
/// Returns maximum definition level for the triplet iterator (leaf column).
#[inline]
fn max_def_level(&self) -> i16 {
self.max_def_level
}
/// Returns maximum repetition level for the triplet iterator (leaf column).
#[inline]
fn max_rep_level(&self) -> i16 {
self.max_rep_level
}
/// Returns current value.
/// Method does not advance the iterator, therefore can be called multiple times.
#[inline]
fn current_value(&self) -> &T::T {
assert!(
self.current_def_level() == self.max_def_level(),
"Cannot extract value, max definition level: {}, current level: {}",
self.max_def_level(),
self.current_def_level()
);
&self.values[self.curr_triplet_index]
}
/// Returns current definition level.
/// If field is required, then maximum definition level is returned.
#[inline]
fn current_def_level(&self) -> i16 {
match self.def_levels {
Some(ref vec) => vec[self.curr_triplet_index],
None => self.max_def_level,
}
}
/// Returns current repetition level.
/// If field is required, then maximum repetition level is returned.
#[inline]
fn current_rep_level(&self) -> i16 {
match self.rep_levels {
Some(ref vec) => vec[self.curr_triplet_index],
None => self.max_rep_level,
}
}
/// Quick check if iterator has more values/levels to read.
/// It is updated as a result of `read_next` method, so they are synchronized.
#[inline]
fn has_next(&self) -> bool {
self.has_next
}
/// Advances to the next triplet.
/// Returns true, if there are more records to read, false there are no records left.
fn read_next(&mut self) -> Result<bool> {
self.curr_triplet_index += 1;
if self.curr_triplet_index >= self.triplets_left {
let (values_read, levels_read) = {
// Get slice of definition levels, if available
let def_levels = self.def_levels.as_mut().map(|vec| &mut vec[..]);
// Get slice of repetition levels, if available
let rep_levels = self.rep_levels.as_mut().map(|vec| &mut vec[..]);
// Buffer triplets
self.reader.read_batch(
self.batch_size,
def_levels,
rep_levels,
&mut self.values,
)?
};
// No more values or levels to read
if values_read == 0 && levels_read == 0 {
self.has_next = false;
return Ok(false);
}
// We never read values more than levels
if levels_read == 0 || values_read == levels_read {
// There are no definition levels to read, column is required
// or definition levels match values, so it does not require spacing
self.curr_triplet_index = 0;
self.triplets_left = values_read;
} else if values_read < levels_read {
// Add spacing for triplets.
// The idea is setting values for positions in def_levels when current
// definition level equals to maximum definition level.
// Values and levels are guaranteed to line up, because of
// the column reader method.
// Note: if values_read == 0, then spacing will not be triggered
let mut idx = values_read;
let def_levels = self.def_levels.as_ref().unwrap();
for i in 0..levels_read {
if def_levels[levels_read - i - 1] == self.max_def_level {
idx -= 1; // This is done to avoid usize becoming a negative value
self.values.swap(levels_read - i - 1, idx);
}
}
self.curr_triplet_index = 0;
self.triplets_left = levels_read;
} else {
return Err(general_err!(
"Spacing of values/levels is wrong, values_read: {}, levels_read: {}",
values_read,
levels_read
));
}
}
self.has_next = true;
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::file::reader::{FileReader, SerializedFileReader};
use crate::schema::types::ColumnPath;
use crate::util::test_common::get_test_file;
#[test]
#[should_panic(expected = "Expected positive batch size, found: 0")]
fn test_triplet_zero_batch_size() {
let column_path =
ColumnPath::from(vec!["b_struct".to_string(), "b_c_int".to_string()]);
test_column_in_file("nulls.snappy.parquet", 0, &column_path, &[], &[], &[]);
}
#[test]
fn test_triplet_null_column() {
let path = vec!["b_struct", "b_c_int"];
let values = vec![];
let def_levels = vec![1, 1, 1, 1, 1, 1, 1, 1];
let rep_levels = vec![0, 0, 0, 0, 0, 0, 0, 0];
test_triplet_iter(
"nulls.snappy.parquet",
path,
&values,
&def_levels,
&rep_levels,
);
}
#[test]
fn test_triplet_required_column() {
let path = vec!["ID"];
let values = vec![Field::Long(8)];
let def_levels = vec![0];
let rep_levels = vec![0];
test_triplet_iter(
"nonnullable.impala.parquet",
path,
&values,
&def_levels,
&rep_levels,
);
}
#[test]
fn test_triplet_optional_column() {
let path = vec!["nested_struct", "A"];
let values = vec![Field::Int(1), Field::Int(7)];
let def_levels = vec![2, 1, 1, 1, 1, 0, 2];
let rep_levels = vec![0, 0, 0, 0, 0, 0, 0];
test_triplet_iter(
"nullable.impala.parquet",
path,
&values,
&def_levels,
&rep_levels,
);
}
#[test]
fn test_triplet_optional_list_column() {
let path = vec!["a", "list", "element", "list", "element", "list", "element"];
let values = vec![
Field::Str("a".to_string()),
Field::Str("b".to_string()),
Field::Str("c".to_string()),
Field::Str("d".to_string()),
Field::Str("a".to_string()),
Field::Str("b".to_string()),
Field::Str("c".to_string()),
Field::Str("d".to_string()),
Field::Str("e".to_string()),
Field::Str("a".to_string()),
Field::Str("b".to_string()),
Field::Str("c".to_string()),
Field::Str("d".to_string()),
Field::Str("e".to_string()),
Field::Str("f".to_string()),
];
let def_levels = vec![7, 7, 7, 4, 7, 7, 7, 7, 7, 4, 7, 7, 7, 7, 7, 7, 4, 7];
let rep_levels = vec![0, 3, 2, 1, 2, 0, 3, 2, 3, 1, 2, 0, 3, 2, 3, 2, 1, 2];
test_triplet_iter(
"nested_lists.snappy.parquet",
path,
&values,
&def_levels,
&rep_levels,
);
}
#[test]
fn test_triplet_optional_map_column() {
let path = vec!["a", "key_value", "value", "key_value", "key"];
let values = vec![
Field::Int(1),
Field::Int(2),
Field::Int(1),
Field::Int(1),
Field::Int(3),
Field::Int(4),
Field::Int(5),
];
let def_levels = vec![4, 4, 4, 2, 3, 4, 4, 4, 4];
let rep_levels = vec![0, 2, 0, 0, 0, 0, 0, 2, 2];
test_triplet_iter(
"nested_maps.snappy.parquet",
path,
&values,
&def_levels,
&rep_levels,
);
}
// Check triplet iterator across different batch sizes
fn test_triplet_iter(
file_name: &str,
column_path: Vec<&str>,
expected_values: &[Field],
expected_def_levels: &[i16],
expected_rep_levels: &[i16],
) {
// Convert path into column path
let path: Vec<String> = column_path.iter().map(|x| x.to_string()).collect();
let column_path = ColumnPath::from(path);
let batch_sizes = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 128, 256];
for batch_size in batch_sizes {
test_column_in_file(
file_name,
batch_size,
&column_path,
expected_values,
expected_def_levels,
expected_rep_levels,
);
}
}
// Check values of a selectd column in a file
fn test_column_in_file(
file_name: &str,
batch_size: usize,
column_path: &ColumnPath,
expected_values: &[Field],
expected_def_levels: &[i16],
expected_rep_levels: &[i16],
) {
let file = get_test_file(file_name);
let file_reader = SerializedFileReader::new(file).unwrap();
let metadata = file_reader.metadata();
// Get schema descriptor
let file_metadata = metadata.file_metadata();
let schema = file_metadata.schema_descr();
// Get first row group
let row_group_reader = file_reader.get_row_group(0).unwrap();
for i in 0..schema.num_columns() {
let descr = schema.column(i);
if descr.path() == column_path {
let reader = row_group_reader.get_column_reader(i).unwrap();
test_triplet_column(
descr,
reader,
batch_size,
expected_values,
expected_def_levels,
expected_rep_levels,
);
}
}
}
// Check values for individual triplet iterator
fn test_triplet_column(
descr: ColumnDescPtr,
reader: ColumnReader,
batch_size: usize,
expected_values: &[Field],
expected_def_levels: &[i16],
expected_rep_levels: &[i16],
) {
let mut iter = TripletIter::new(descr.clone(), reader, batch_size);
let mut values: Vec<Field> = Vec::new();
let mut def_levels: Vec<i16> = Vec::new();
let mut rep_levels: Vec<i16> = Vec::new();
assert_eq!(iter.max_def_level(), descr.max_def_level());
assert_eq!(iter.max_rep_level(), descr.max_rep_level());
while let Ok(true) = iter.read_next() {
assert!(iter.has_next());
if !iter.is_null() {
values.push(iter.current_value());
}
def_levels.push(iter.current_def_level());
rep_levels.push(iter.current_rep_level());
}
assert_eq!(values, expected_values);
assert_eq!(def_levels, expected_def_levels);
assert_eq!(rep_levels, expected_rep_levels);
}
}