| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| use std::cmp::min; |
| use std::collections::{HashMap, HashSet}; |
| use std::marker::PhantomData; |
| use std::mem::size_of; |
| use std::mem::transmute; |
| use std::rc::Rc; |
| use std::result::Result::Ok; |
| use std::slice::from_raw_parts_mut; |
| use std::sync::Arc; |
| use std::vec::Vec; |
| |
| use arrow::array::{ |
| ArrayDataBuilder, ArrayDataRef, ArrayRef, BooleanBufferBuilder, BufferBuilderTrait, |
| Int16BufferBuilder, StructArray, |
| }; |
| use arrow::buffer::{Buffer, MutableBuffer}; |
| use arrow::datatypes::{DataType as ArrowType, Field}; |
| |
| use crate::arrow::converter::{ |
| BooleanConverter, Converter, Float32Converter, Float64Converter, Int16Converter, |
| Int32Converter, Int64Converter, Int8Converter, UInt16Converter, UInt32Converter, |
| UInt64Converter, UInt8Converter, |
| }; |
| use crate::arrow::record_reader::RecordReader; |
| use crate::arrow::schema::parquet_to_arrow_field; |
| use crate::basic::{Repetition, Type as PhysicalType}; |
| use crate::column::page::PageIterator; |
| use crate::data_type::{ |
| BoolType, ByteArrayType, DataType, DoubleType, FloatType, Int32Type, Int64Type, |
| Int96Type, |
| }; |
| use crate::errors::{ParquetError, ParquetError::ArrowError, Result}; |
| use crate::file::reader::{FilePageIterator, FileReader}; |
| use crate::schema::types::{ |
| ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, Type, TypePtr, |
| }; |
| use crate::schema::visitor::TypeVisitor; |
| use std::any::Any; |
| |
| /// Array reader reads parquet data into arrow array. |
| pub trait ArrayReader { |
| fn as_any(&self) -> &dyn Any; |
| |
| /// Returns the arrow type of this array reader. |
| fn get_data_type(&self) -> &ArrowType; |
| |
| /// Reads at most `batch_size` records into an arrow array and return it. |
| fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef>; |
| |
| /// Returns the definition levels of data from last call of `next_batch`. |
| /// The result is used by parent array reader to calculate its own definition |
| /// levels and repetition levels, so that its parent can calculate null bitmap. |
| fn get_def_levels(&self) -> Option<&[i16]>; |
| |
| /// Return the repetition levels of data from last call of `next_batch`. |
| /// The result is used by parent array reader to calculate its own definition |
| /// levels and repetition levels, so that its parent can calculate null bitmap. |
| fn get_rep_levels(&self) -> Option<&[i16]>; |
| } |
| |
| /// Primitive array readers are leaves of array reader tree. They accept page iterator |
| /// and read them into primitive arrays. |
| pub struct PrimitiveArrayReader<T: DataType> { |
| data_type: ArrowType, |
| pages: Box<dyn PageIterator>, |
| def_levels_buffer: Option<Buffer>, |
| rep_levels_buffer: Option<Buffer>, |
| column_desc: ColumnDescPtr, |
| record_reader: RecordReader<T>, |
| _type_marker: PhantomData<T>, |
| } |
| |
| impl<T: DataType> PrimitiveArrayReader<T> { |
| /// Construct primitive array reader. |
| pub fn new( |
| mut pages: Box<dyn PageIterator>, |
| column_desc: ColumnDescPtr, |
| ) -> Result<Self> { |
| let data_type = parquet_to_arrow_field(column_desc.clone())? |
| .data_type() |
| .clone(); |
| |
| let mut record_reader = RecordReader::<T>::new(column_desc.clone()); |
| record_reader.set_page_reader( |
| pages |
| .next() |
| .ok_or_else(|| general_err!("Can't build array without pages!"))??, |
| )?; |
| |
| Ok(Self { |
| data_type, |
| pages, |
| def_levels_buffer: None, |
| rep_levels_buffer: None, |
| column_desc, |
| record_reader, |
| _type_marker: PhantomData, |
| }) |
| } |
| } |
| |
| /// Implementation of primitive array reader. |
| impl<T: DataType> ArrayReader for PrimitiveArrayReader<T> { |
| fn as_any(&self) -> &dyn Any { |
| self |
| } |
| |
| /// Returns data type of primitive array. |
| fn get_data_type(&self) -> &ArrowType { |
| &self.data_type |
| } |
| |
| /// Reads at most `batch_size` records into array. |
| fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> { |
| let mut records_read = 0usize; |
| while records_read < batch_size { |
| let records_to_read = batch_size - records_read; |
| |
| let records_read_once = self.record_reader.read_records(records_to_read)?; |
| records_read = records_read + records_read_once; |
| |
| // Record reader exhausted |
| if records_read_once < records_to_read { |
| if let Some(page_reader) = self.pages.next() { |
| // Read from new page reader |
| self.record_reader.set_page_reader(page_reader?)?; |
| } else { |
| // Page reader also exhausted |
| break; |
| } |
| } |
| } |
| |
| // convert to arrays |
| let array = match (&self.data_type, T::get_physical_type()) { |
| (ArrowType::Boolean, PhysicalType::BOOLEAN) => unsafe { |
| BooleanConverter::convert(transmute::< |
| &mut RecordReader<T>, |
| &mut RecordReader<BoolType>, |
| >(&mut self.record_reader)) |
| }, |
| (ArrowType::Int8, PhysicalType::INT32) => unsafe { |
| Int8Converter::convert(transmute::< |
| &mut RecordReader<T>, |
| &mut RecordReader<Int32Type>, |
| >(&mut self.record_reader)) |
| }, |
| (ArrowType::Int16, PhysicalType::INT32) => unsafe { |
| Int16Converter::convert(transmute::< |
| &mut RecordReader<T>, |
| &mut RecordReader<Int32Type>, |
| >(&mut self.record_reader)) |
| }, |
| (ArrowType::Int32, PhysicalType::INT32) => unsafe { |
| Int32Converter::convert(transmute::< |
| &mut RecordReader<T>, |
| &mut RecordReader<Int32Type>, |
| >(&mut self.record_reader)) |
| }, |
| (ArrowType::UInt8, PhysicalType::INT32) => unsafe { |
| UInt8Converter::convert(transmute::< |
| &mut RecordReader<T>, |
| &mut RecordReader<Int32Type>, |
| >(&mut self.record_reader)) |
| }, |
| (ArrowType::UInt16, PhysicalType::INT32) => unsafe { |
| UInt16Converter::convert(transmute::< |
| &mut RecordReader<T>, |
| &mut RecordReader<Int32Type>, |
| >(&mut self.record_reader)) |
| }, |
| (ArrowType::UInt32, PhysicalType::INT32) => unsafe { |
| UInt32Converter::convert(transmute::< |
| &mut RecordReader<T>, |
| &mut RecordReader<Int32Type>, |
| >(&mut self.record_reader)) |
| }, |
| (ArrowType::Int64, PhysicalType::INT64) => unsafe { |
| Int64Converter::convert(transmute::< |
| &mut RecordReader<T>, |
| &mut RecordReader<Int64Type>, |
| >(&mut self.record_reader)) |
| }, |
| (ArrowType::UInt64, PhysicalType::INT64) => unsafe { |
| UInt64Converter::convert(transmute::< |
| &mut RecordReader<T>, |
| &mut RecordReader<Int64Type>, |
| >(&mut self.record_reader)) |
| }, |
| (ArrowType::Float32, PhysicalType::FLOAT) => unsafe { |
| Float32Converter::convert(transmute::< |
| &mut RecordReader<T>, |
| &mut RecordReader<FloatType>, |
| >(&mut self.record_reader)) |
| }, |
| (ArrowType::Float64, PhysicalType::DOUBLE) => unsafe { |
| Float64Converter::convert(transmute::< |
| &mut RecordReader<T>, |
| &mut RecordReader<DoubleType>, |
| >(&mut self.record_reader)) |
| }, |
| (arrow_type, _) => Err(general_err!( |
| "Reading {:?} type from parquet is not supported yet.", |
| arrow_type |
| )), |
| }?; |
| |
| // save definition and repetition buffers |
| self.def_levels_buffer = self.record_reader.consume_def_levels()?; |
| self.rep_levels_buffer = self.record_reader.consume_rep_levels()?; |
| self.record_reader.reset(); |
| Ok(array) |
| } |
| |
| fn get_def_levels(&self) -> Option<&[i16]> { |
| self.def_levels_buffer.as_ref().map(|buf| buf.typed_data()) |
| } |
| |
| fn get_rep_levels(&self) -> Option<&[i16]> { |
| self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data()) |
| } |
| } |
| |
| /// Implementation of struct array reader. |
| pub struct StructArrayReader { |
| children: Vec<Box<dyn ArrayReader>>, |
| data_type: ArrowType, |
| struct_def_level: i16, |
| struct_rep_level: i16, |
| def_level_buffer: Option<Buffer>, |
| rep_level_buffer: Option<Buffer>, |
| } |
| |
| impl StructArrayReader { |
| /// Construct struct array reader. |
| pub fn new( |
| data_type: ArrowType, |
| children: Vec<Box<dyn ArrayReader>>, |
| def_level: i16, |
| rep_level: i16, |
| ) -> Self { |
| Self { |
| data_type, |
| children, |
| struct_def_level: def_level, |
| struct_rep_level: rep_level, |
| def_level_buffer: None, |
| rep_level_buffer: None, |
| } |
| } |
| } |
| |
| impl ArrayReader for StructArrayReader { |
| fn as_any(&self) -> &dyn Any { |
| self |
| } |
| |
| /// Returns data type. |
| /// This must be a struct. |
| fn get_data_type(&self) -> &ArrowType { |
| &self.data_type |
| } |
| |
| /// Read `batch_size` struct records. |
| /// |
| /// Definition levels of struct array is calculated as following: |
| /// ```ignore |
| /// def_levels[i] = min(child1_def_levels[i], child2_def_levels[i], ..., |
| /// childn_def_levels[i]); |
| /// ``` |
| /// |
| /// Repetition levels of struct array is calculated as following: |
| /// ```ignore |
| /// rep_levels[i] = child1_rep_levels[i]; |
| /// ``` |
| /// |
| /// The null bitmap of struct array is calculated from def_levels: |
| /// ```ignore |
| /// null_bitmap[i] = (def_levels[i] >= self.def_level); |
| /// ``` |
| fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> { |
| if self.children.len() == 0 { |
| self.def_level_buffer = None; |
| self.rep_level_buffer = None; |
| return Ok(Arc::new(StructArray::from(Vec::new()))); |
| } |
| |
| let children_array = self |
| .children |
| .iter_mut() |
| .map(|reader| reader.next_batch(batch_size)) |
| .try_fold( |
| Vec::new(), |
| |mut result, child_array| -> Result<Vec<ArrayRef>> { |
| result.push(child_array?); |
| Ok(result) |
| }, |
| )?; |
| |
| // check that array child data has same size |
| let children_array_len = |
| children_array.first().map(|arr| arr.len()).ok_or_else(|| { |
| general_err!("Struct array reader should have at least one child!") |
| })?; |
| |
| let all_children_len_eq = children_array |
| .iter() |
| .all(|arr| arr.len() == children_array_len); |
| if !all_children_len_eq { |
| return Err(general_err!("Not all children array length are the same!")); |
| } |
| |
| // calculate struct def level data |
| let buffer_size = children_array_len * size_of::<i16>(); |
| let mut def_level_data_buffer = MutableBuffer::new(buffer_size); |
| def_level_data_buffer.resize(buffer_size)?; |
| |
| let def_level_data = unsafe { |
| let ptr = transmute::<*const u8, *mut i16>(def_level_data_buffer.raw_data()); |
| from_raw_parts_mut(ptr, children_array_len) |
| }; |
| |
| def_level_data |
| .iter_mut() |
| .for_each(|v| *v = self.struct_def_level); |
| |
| for child in &self.children { |
| if let Some(current_child_def_levels) = child.get_def_levels() { |
| if current_child_def_levels.len() != children_array_len { |
| return Err(general_err!("Child array length are not equal!")); |
| } else { |
| for i in 0..children_array_len { |
| def_level_data[i] = |
| min(def_level_data[i], current_child_def_levels[i]); |
| } |
| } |
| } |
| } |
| |
| // calculate bitmap for current array |
| let mut bitmap_builder = BooleanBufferBuilder::new(children_array_len); |
| let mut null_count = 0; |
| for def_level in def_level_data { |
| let not_null = *def_level >= self.struct_def_level; |
| if !not_null { |
| null_count += 1; |
| } |
| bitmap_builder.append(not_null)?; |
| } |
| |
| // Now we can build array data |
| let array_data = ArrayDataBuilder::new(self.data_type.clone()) |
| .len(children_array_len) |
| .null_count(null_count) |
| .null_bit_buffer(bitmap_builder.finish()) |
| .child_data( |
| children_array |
| .iter() |
| .map(|x| x.data()) |
| .collect::<Vec<ArrayDataRef>>(), |
| ) |
| .build(); |
| |
| // calculate struct rep level data, since struct doesn't add to repetition |
| // levels, here we just need to keep repetition levels of first array |
| // TODO: Verify that all children array reader has same repetition levels |
| let rep_level_data = self |
| .children |
| .first() |
| .ok_or_else(|| { |
| general_err!("Struct array reader should have at least one child!") |
| })? |
| .get_rep_levels() |
| .map(|data| -> Result<Buffer> { |
| let mut buffer = Int16BufferBuilder::new(children_array_len); |
| buffer.append_slice(data)?; |
| Ok(buffer.finish()) |
| }) |
| .transpose()?; |
| |
| self.def_level_buffer = Some(def_level_data_buffer.freeze()); |
| self.rep_level_buffer = rep_level_data; |
| Ok(Arc::new(StructArray::from(array_data))) |
| } |
| |
| fn get_def_levels(&self) -> Option<&[i16]> { |
| self.def_level_buffer.as_ref().map(|buf| buf.typed_data()) |
| } |
| |
| fn get_rep_levels(&self) -> Option<&[i16]> { |
| self.rep_level_buffer.as_ref().map(|buf| buf.typed_data()) |
| } |
| } |
| |
| /// Create array reader from parquet schema, column indices, and parquet file reader. |
| pub fn build_array_reader<T>( |
| parquet_schema: SchemaDescPtr, |
| column_indices: T, |
| file_reader: Rc<dyn FileReader>, |
| ) -> Result<Box<dyn ArrayReader>> |
| where |
| T: IntoIterator<Item = usize>, |
| { |
| let mut base_nodes = Vec::new(); |
| let mut base_nodes_set = HashSet::new(); |
| let mut leaves = HashMap::<*const Type, usize>::new(); |
| |
| for c in column_indices { |
| let column = parquet_schema.column(c).self_type() as *const Type; |
| let root = parquet_schema.get_column_root_ptr(c); |
| let root_raw_ptr = root.clone().as_ref() as *const Type; |
| |
| leaves.insert(column, c); |
| if !base_nodes_set.contains(&root_raw_ptr) { |
| base_nodes.push(root); |
| base_nodes_set.insert(root_raw_ptr); |
| } |
| } |
| |
| if leaves.is_empty() { |
| return Err(general_err!("Can't build array reader without columns!")); |
| } |
| |
| ArrayReaderBuilder::new( |
| Rc::new(parquet_schema.root_schema().clone()), |
| Rc::new(leaves), |
| file_reader, |
| ) |
| .build_array_reader() |
| } |
| |
| /// Used to build array reader. |
| struct ArrayReaderBuilder { |
| root_schema: TypePtr, |
| // Key: columns that need to be included in final array builder |
| // Value: column index in schema |
| columns_included: Rc<HashMap<*const Type, usize>>, |
| file_reader: Rc<dyn FileReader>, |
| } |
| |
| /// Used in type visitor. |
| #[derive(Clone)] |
| struct ArrayReaderBuilderContext { |
| def_level: i16, |
| rep_level: i16, |
| path: ColumnPath, |
| } |
| |
| impl Default for ArrayReaderBuilderContext { |
| fn default() -> Self { |
| Self { |
| def_level: 0i16, |
| rep_level: 0i16, |
| path: ColumnPath::new(Vec::new()), |
| } |
| } |
| } |
| |
| /// Create array reader by visiting schema. |
| impl<'a> TypeVisitor<Option<Box<dyn ArrayReader>>, &'a ArrayReaderBuilderContext> |
| for ArrayReaderBuilder |
| { |
| /// Build array reader for primitive type. |
| /// Currently we don't have a list reader implementation, so repeated type is not |
| /// supported yet. |
| fn visit_primitive( |
| &mut self, |
| cur_type: TypePtr, |
| context: &'a ArrayReaderBuilderContext, |
| ) -> Result<Option<Box<dyn ArrayReader>>> { |
| if self.is_included(cur_type.as_ref()) { |
| let mut new_context = context.clone(); |
| new_context.path.append(vec![cur_type.name().to_string()]); |
| |
| match cur_type.get_basic_info().repetition() { |
| Repetition::REPEATED => { |
| new_context.def_level += 1; |
| new_context.rep_level += 1; |
| } |
| Repetition::OPTIONAL => { |
| new_context.def_level += 1; |
| } |
| _ => (), |
| } |
| |
| let reader = |
| self.build_for_primitive_type_inner(cur_type.clone(), &new_context)?; |
| |
| if cur_type.get_basic_info().repetition() == Repetition::REPEATED { |
| Err(ArrowError( |
| "Reading repeated field is not supported yet!".to_string(), |
| )) |
| } else { |
| Ok(Some(reader)) |
| } |
| } else { |
| Ok(None) |
| } |
| } |
| |
| /// Build array reader for struct type. |
| fn visit_struct( |
| &mut self, |
| cur_type: Rc<Type>, |
| context: &'a ArrayReaderBuilderContext, |
| ) -> Result<Option<Box<ArrayReader>>> { |
| let mut new_context = context.clone(); |
| new_context.path.append(vec![cur_type.name().to_string()]); |
| |
| if cur_type.get_basic_info().has_repetition() { |
| match cur_type.get_basic_info().repetition() { |
| Repetition::REPEATED => { |
| new_context.def_level += 1; |
| new_context.rep_level += 1; |
| } |
| Repetition::OPTIONAL => { |
| new_context.def_level += 1; |
| } |
| _ => (), |
| } |
| } |
| |
| if let Some(reader) = |
| self.build_for_struct_type_inner(cur_type.clone(), &new_context)? |
| { |
| if cur_type.get_basic_info().has_repetition() |
| && cur_type.get_basic_info().repetition() == Repetition::REPEATED |
| { |
| Err(ArrowError( |
| "Reading repeated field is not supported yet!".to_string(), |
| )) |
| } else { |
| Ok(Some(reader)) |
| } |
| } else { |
| Ok(None) |
| } |
| } |
| |
| /// Build array reader for map type. |
| /// Currently this is not supported. |
| fn visit_map( |
| &mut self, |
| _cur_type: Rc<Type>, |
| _context: &'a ArrayReaderBuilderContext, |
| ) -> Result<Option<Box<dyn ArrayReader>>> { |
| Err(ArrowError( |
| "Reading parquet map array into arrow is not supported yet!".to_string(), |
| )) |
| } |
| |
| /// Build array reader for list type. |
| /// Currently this is not supported. |
| fn visit_list_with_item( |
| &mut self, |
| _list_type: Rc<Type>, |
| _item_type: &Type, |
| _context: &'a ArrayReaderBuilderContext, |
| ) -> Result<Option<Box<dyn ArrayReader>>> { |
| Err(ArrowError( |
| "Reading parquet list array into arrow is not supported yet!".to_string(), |
| )) |
| } |
| } |
| |
| impl<'a> ArrayReaderBuilder { |
| /// Construct array reader builder. |
| fn new( |
| root_schema: TypePtr, |
| columns_included: Rc<HashMap<*const Type, usize>>, |
| file_reader: Rc<dyn FileReader>, |
| ) -> Self { |
| Self { |
| root_schema, |
| columns_included, |
| file_reader, |
| } |
| } |
| |
| /// Main entry point. |
| fn build_array_reader(&mut self) -> Result<Box<dyn ArrayReader>> { |
| let context = ArrayReaderBuilderContext::default(); |
| |
| self.visit_struct(self.root_schema.clone(), &context) |
| .and_then(|reader_opt| { |
| reader_opt.ok_or_else(|| general_err!("Failed to build array reader!")) |
| }) |
| } |
| |
| // Utility functions |
| |
| /// Check whether one column in included in this array reader builder. |
| fn is_included(&self, t: &Type) -> bool { |
| self.columns_included.contains_key(&(t as *const Type)) |
| } |
| |
| /// Creates primitive array reader for each primitive type. |
| fn build_for_primitive_type_inner( |
| &self, |
| cur_type: TypePtr, |
| context: &'a ArrayReaderBuilderContext, |
| ) -> Result<Box<dyn ArrayReader>> { |
| let column_desc = Rc::new(ColumnDescriptor::new( |
| cur_type.clone(), |
| Some(self.root_schema.clone()), |
| context.def_level, |
| context.rep_level, |
| context.path.clone(), |
| )); |
| let page_iterator = Box::new(FilePageIterator::new( |
| self.columns_included[&(cur_type.as_ref() as *const Type)], |
| self.file_reader.clone(), |
| )?); |
| |
| match cur_type.get_physical_type() { |
| PhysicalType::BOOLEAN => Ok(Box::new(PrimitiveArrayReader::<BoolType>::new( |
| page_iterator, |
| column_desc, |
| )?)), |
| PhysicalType::INT32 => Ok(Box::new(PrimitiveArrayReader::<Int32Type>::new( |
| page_iterator, |
| column_desc, |
| )?)), |
| PhysicalType::INT64 => Ok(Box::new(PrimitiveArrayReader::<Int64Type>::new( |
| page_iterator, |
| column_desc, |
| )?)), |
| PhysicalType::INT96 => Ok(Box::new(PrimitiveArrayReader::<Int96Type>::new( |
| page_iterator, |
| column_desc, |
| )?)), |
| PhysicalType::FLOAT => Ok(Box::new(PrimitiveArrayReader::<FloatType>::new( |
| page_iterator, |
| column_desc, |
| )?)), |
| PhysicalType::DOUBLE => Ok(Box::new( |
| PrimitiveArrayReader::<DoubleType>::new(page_iterator, column_desc)?, |
| )), |
| PhysicalType::BYTE_ARRAY => Ok(Box::new(PrimitiveArrayReader::< |
| ByteArrayType, |
| >::new( |
| page_iterator, column_desc |
| )?)), |
| other => Err(ArrowError(format!( |
| "Unable to create primite array reader for parquet physical type {}", |
| other |
| ))), |
| } |
| } |
| |
| /// Constructs struct array reader without considering repetition. |
| fn build_for_struct_type_inner( |
| &mut self, |
| cur_type: TypePtr, |
| context: &'a ArrayReaderBuilderContext, |
| ) -> Result<Option<Box<dyn ArrayReader>>> { |
| let mut fields = Vec::with_capacity(cur_type.get_fields().len()); |
| let mut children_reader = Vec::with_capacity(cur_type.get_fields().len()); |
| |
| for child in cur_type.get_fields() { |
| if let Some(child_reader) = self.dispatch(child.clone(), context)? { |
| fields.push(Field::new( |
| child.name(), |
| child_reader.get_data_type().clone(), |
| child.is_optional(), |
| )); |
| children_reader.push(child_reader); |
| } |
| } |
| |
| if !fields.is_empty() { |
| let arrow_type = ArrowType::Struct(fields); |
| Ok(Some(Box::new(StructArrayReader::new( |
| arrow_type, |
| children_reader, |
| context.def_level, |
| context.rep_level, |
| )))) |
| } else { |
| Ok(None) |
| } |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use crate::arrow::array_reader::{ |
| build_array_reader, ArrayReader, PrimitiveArrayReader, StructArrayReader, |
| }; |
| use crate::basic::Encoding; |
| use crate::column::page::Page; |
| use crate::data_type::{DataType, Int32Type}; |
| use crate::errors::Result; |
| use crate::file::reader::{FileReader, SerializedFileReader}; |
| use crate::schema::parser::parse_message_type; |
| use crate::schema::types::{ColumnDescPtr, SchemaDescriptor}; |
| use crate::util::test_common::page_util::InMemoryPageIterator; |
| use crate::util::test_common::{get_test_file, make_pages}; |
| use arrow::array::{Array, ArrayRef, PrimitiveArray, StructArray}; |
| use arrow::datatypes::{DataType as ArrowType, Field, Int32Type as ArrowInt32}; |
| use rand::distributions::range::SampleRange; |
| use std::any::Any; |
| use std::collections::VecDeque; |
| use std::rc::Rc; |
| use std::sync::Arc; |
| |
| fn make_column_chuncks<T: DataType>( |
| column_desc: ColumnDescPtr, |
| encoding: Encoding, |
| num_levels: usize, |
| min_value: T::T, |
| max_value: T::T, |
| def_levels: &mut Vec<i16>, |
| rep_levels: &mut Vec<i16>, |
| values: &mut Vec<T::T>, |
| page_lists: &mut Vec<Vec<Page>>, |
| use_v2: bool, |
| num_chuncks: usize, |
| ) where |
| T::T: PartialOrd + SampleRange + Copy, |
| { |
| for _i in 0..num_chuncks { |
| let mut pages = VecDeque::new(); |
| let mut data = Vec::new(); |
| let mut page_def_levels = Vec::new(); |
| let mut page_rep_levels = Vec::new(); |
| |
| make_pages::<T>( |
| column_desc.clone(), |
| encoding, |
| 1, |
| num_levels, |
| min_value, |
| max_value, |
| &mut page_def_levels, |
| &mut page_rep_levels, |
| &mut data, |
| &mut pages, |
| use_v2, |
| ); |
| |
| def_levels.append(&mut page_def_levels); |
| rep_levels.append(&mut page_rep_levels); |
| values.append(&mut data); |
| page_lists.push(Vec::from(pages)); |
| } |
| } |
| |
| #[test] |
| fn test_primitive_array_reader_data() { |
| // Construct column schema |
| let message_type = " |
| message test_schema { |
| REQUIRED INT32 leaf; |
| } |
| "; |
| |
| let schema = parse_message_type(message_type) |
| .map(|t| Rc::new(SchemaDescriptor::new(Rc::new(t)))) |
| .unwrap(); |
| |
| let column_desc = schema.column(0); |
| |
| // Construct page iterator |
| { |
| let mut data = Vec::new(); |
| let mut page_lists = Vec::new(); |
| make_column_chuncks::<Int32Type>( |
| column_desc.clone(), |
| Encoding::PLAIN, |
| 100, |
| 1, |
| 200, |
| &mut Vec::new(), |
| &mut Vec::new(), |
| &mut data, |
| &mut page_lists, |
| true, |
| 2, |
| ); |
| let page_iterator = InMemoryPageIterator::new( |
| schema.clone(), |
| column_desc.clone(), |
| page_lists, |
| ); |
| |
| let mut array_reader = PrimitiveArrayReader::<Int32Type>::new( |
| Box::new(page_iterator), |
| column_desc.clone(), |
| ) |
| .unwrap(); |
| |
| // Read first 50 values, which are all from the first column chunck |
| let array = array_reader.next_batch(50).unwrap(); |
| let array = array |
| .as_any() |
| .downcast_ref::<PrimitiveArray<ArrowInt32>>() |
| .unwrap(); |
| |
| assert_eq!( |
| &PrimitiveArray::<ArrowInt32>::from( |
| data[0..50].iter().cloned().collect::<Vec<i32>>() |
| ), |
| array |
| ); |
| |
| // Read next 100 values, the first 50 ones are from the first column chunk, |
| // and the last 50 ones are from the second column chunk |
| let array = array_reader.next_batch(100).unwrap(); |
| let array = array |
| .as_any() |
| .downcast_ref::<PrimitiveArray<ArrowInt32>>() |
| .unwrap(); |
| |
| assert_eq!( |
| &PrimitiveArray::<ArrowInt32>::from( |
| data[50..150].iter().cloned().collect::<Vec<i32>>() |
| ), |
| array |
| ); |
| |
| // Try to read 100 values, however there are only 50 values |
| let array = array_reader.next_batch(100).unwrap(); |
| let array = array |
| .as_any() |
| .downcast_ref::<PrimitiveArray<ArrowInt32>>() |
| .unwrap(); |
| |
| assert_eq!( |
| &PrimitiveArray::<ArrowInt32>::from( |
| data[150..200].iter().cloned().collect::<Vec<i32>>() |
| ), |
| array |
| ); |
| } |
| } |
| |
| #[test] |
| fn test_primitive_array_reader_def_and_rep_levels() { |
| // Construct column schema |
| let message_type = " |
| message test_schema { |
| REPEATED Group test_mid { |
| OPTIONAL INT32 leaf; |
| } |
| } |
| "; |
| |
| let schema = parse_message_type(message_type) |
| .map(|t| Rc::new(SchemaDescriptor::new(Rc::new(t)))) |
| .unwrap(); |
| |
| let column_desc = schema.column(0); |
| |
| // Construct page iterator |
| { |
| let mut def_levels = Vec::new(); |
| let mut rep_levels = Vec::new(); |
| let mut page_lists = Vec::new(); |
| make_column_chuncks::<Int32Type>( |
| column_desc.clone(), |
| Encoding::PLAIN, |
| 100, |
| 1, |
| 200, |
| &mut def_levels, |
| &mut rep_levels, |
| &mut Vec::new(), |
| &mut page_lists, |
| true, |
| 2, |
| ); |
| |
| let page_iterator = InMemoryPageIterator::new( |
| schema.clone(), |
| column_desc.clone(), |
| page_lists, |
| ); |
| |
| let mut array_reader = PrimitiveArrayReader::<Int32Type>::new( |
| Box::new(page_iterator), |
| column_desc.clone(), |
| ) |
| .unwrap(); |
| |
| let mut accu_len: usize = 0; |
| |
| // Read first 50 values, which are all from the first column chunck |
| let array = array_reader.next_batch(50).unwrap(); |
| assert_eq!( |
| Some(&def_levels[accu_len..(accu_len + array.len())]), |
| array_reader.get_def_levels() |
| ); |
| assert_eq!( |
| Some(&rep_levels[accu_len..(accu_len + array.len())]), |
| array_reader.get_rep_levels() |
| ); |
| accu_len += array.len(); |
| |
| // Read next 100 values, the first 50 ones are from the first column chunk, |
| // and the last 50 ones are from the second column chunk |
| let array = array_reader.next_batch(100).unwrap(); |
| assert_eq!( |
| Some(&def_levels[accu_len..(accu_len + array.len())]), |
| array_reader.get_def_levels() |
| ); |
| assert_eq!( |
| Some(&rep_levels[accu_len..(accu_len + array.len())]), |
| array_reader.get_rep_levels() |
| ); |
| accu_len += array.len(); |
| |
| // Try to read 100 values, however there are only 50 values |
| let array = array_reader.next_batch(100).unwrap(); |
| assert_eq!( |
| Some(&def_levels[accu_len..(accu_len + array.len())]), |
| array_reader.get_def_levels() |
| ); |
| assert_eq!( |
| Some(&rep_levels[accu_len..(accu_len + array.len())]), |
| array_reader.get_rep_levels() |
| ); |
| } |
| } |
| |
| /// Array reader for test. |
| struct InMemoryArrayReader { |
| data_type: ArrowType, |
| array: ArrayRef, |
| def_levels: Option<Vec<i16>>, |
| rep_levels: Option<Vec<i16>>, |
| } |
| |
| impl InMemoryArrayReader { |
| pub fn new( |
| data_type: ArrowType, |
| array: ArrayRef, |
| def_levels: Option<Vec<i16>>, |
| rep_levels: Option<Vec<i16>>, |
| ) -> Self { |
| Self { |
| data_type, |
| array, |
| def_levels, |
| rep_levels, |
| } |
| } |
| } |
| |
| impl ArrayReader for InMemoryArrayReader { |
| fn as_any(&self) -> &Any { |
| self |
| } |
| |
| fn get_data_type(&self) -> &ArrowType { |
| &self.data_type |
| } |
| |
| fn next_batch(&mut self, _batch_size: usize) -> Result<ArrayRef> { |
| Ok(self.array.clone()) |
| } |
| |
| fn get_def_levels(&self) -> Option<&[i16]> { |
| self.def_levels.as_ref().map(|v| v.as_slice()) |
| } |
| |
| fn get_rep_levels(&self) -> Option<&[i16]> { |
| self.rep_levels.as_ref().map(|v| v.as_slice()) |
| } |
| } |
| |
| #[test] |
| fn test_struct_array_reader() { |
| let array_1 = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![1, 2, 3, 4, 5])); |
| let array_reader_1 = InMemoryArrayReader::new( |
| ArrowType::Int32, |
| array_1.clone(), |
| Some(vec![0, 1, 2, 3, 1]), |
| Some(vec![1, 1, 1, 1, 1]), |
| ); |
| |
| let array_2 = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![5, 4, 3, 2, 1])); |
| let array_reader_2 = InMemoryArrayReader::new( |
| ArrowType::Int32, |
| array_2.clone(), |
| Some(vec![0, 1, 3, 1, 2]), |
| Some(vec![1, 1, 1, 1, 1]), |
| ); |
| |
| let struct_type = ArrowType::Struct(vec![ |
| Field::new("f1", array_1.data_type().clone(), true), |
| Field::new("f2", array_2.data_type().clone(), true), |
| ]); |
| |
| let mut struct_array_reader = StructArrayReader::new( |
| struct_type, |
| vec![Box::new(array_reader_1), Box::new(array_reader_2)], |
| 1, |
| 1, |
| ); |
| |
| let struct_array = struct_array_reader.next_batch(5).unwrap(); |
| let struct_array = struct_array.as_any().downcast_ref::<StructArray>().unwrap(); |
| |
| assert_eq!(5, struct_array.len()); |
| assert_eq!( |
| vec![true, false, false, false, false], |
| (0..5) |
| .map(|idx| struct_array.data_ref().is_null(idx)) |
| .collect::<Vec<bool>>() |
| ); |
| assert_eq!( |
| Some(vec![0, 1, 1, 1, 1].as_slice()), |
| struct_array_reader.get_def_levels() |
| ); |
| assert_eq!( |
| Some(vec![1, 1, 1, 1, 1].as_slice()), |
| struct_array_reader.get_rep_levels() |
| ); |
| } |
| |
| #[test] |
| fn test_create_array_reader() { |
| let file = get_test_file("nulls.snappy.parquet"); |
| let file_reader = Rc::new(SerializedFileReader::new(file).unwrap()); |
| |
| let array_reader = build_array_reader( |
| file_reader.metadata().file_metadata().schema_descr_ptr(), |
| vec![0usize].into_iter(), |
| file_reader, |
| ) |
| .unwrap(); |
| |
| // Create arrow types |
| let arrow_type = ArrowType::Struct(vec![Field::new( |
| "b_struct", |
| ArrowType::Struct(vec![Field::new("b_c_int", ArrowType::Int32, true)]), |
| true, |
| )]); |
| |
| assert_eq!(array_reader.get_data_type(), &arrow_type); |
| } |
| } |