| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| //! Contains file reader API and provides methods to access file metadata, row group |
| //! readers to read individual column chunks, or access record iterator. |
| |
| use std::{ |
| convert::TryFrom, |
| fs::File, |
| io::{BufReader, Cursor, Read, Seek, SeekFrom}, |
| path::Path, |
| rc::Rc, |
| }; |
| |
| use byteorder::{ByteOrder, LittleEndian}; |
| use parquet_format::{ |
| ColumnOrder as TColumnOrder, FileMetaData as TFileMetaData, PageHeader, PageType, |
| }; |
| use thrift::protocol::TCompactInputProtocol; |
| |
| use crate::basic::{ColumnOrder, Compression, Encoding, Type}; |
| use crate::column::page::PageIterator; |
| use crate::column::{ |
| page::{Page, PageReader}, |
| reader::{ColumnReader, ColumnReaderImpl}, |
| }; |
| use crate::compression::{create_codec, Codec}; |
| use crate::errors::{ParquetError, Result}; |
| use crate::file::{metadata::*, statistics, FOOTER_SIZE, PARQUET_MAGIC}; |
| use crate::record::reader::RowIter; |
| use crate::record::Row; |
| use crate::schema::types::{ |
| self, ColumnDescPtr, SchemaDescPtr, SchemaDescriptor, Type as SchemaType, |
| }; |
| use crate::util::{io::FileSource, memory::ByteBufferPtr}; |
| |
| // ---------------------------------------------------------------------- |
| // APIs for file & row group readers |
| |
| /// Parquet file reader API. With this, user can get metadata information about the |
| /// Parquet file, can get reader for each row group, and access record iterator. |
| pub trait FileReader { |
| /// Get metadata information about this file. |
| fn metadata(&self) -> &ParquetMetaData; |
| |
| /// Get the total number of row groups for this file. |
| fn num_row_groups(&self) -> usize; |
| |
| /// Get the `i`th row group reader. Note this doesn't do bound check. |
| fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader + '_>>; |
| |
| /// Get full iterator of `Row`s from a file (over all row groups). |
| /// |
| /// Iterator will automatically load the next row group to advance. |
| /// |
| /// Projected schema can be a subset of or equal to the file schema, when it is None, |
| /// full file schema is assumed. |
| fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>; |
| } |
| |
| /// Parquet row group reader API. With this, user can get metadata information about the |
| /// row group, as well as readers for each individual column chunk. |
| pub trait RowGroupReader { |
| /// Get metadata information about this row group. |
| fn metadata(&self) -> &RowGroupMetaData; |
| |
| /// Get the total number of column chunks in this row group. |
| fn num_columns(&self) -> usize; |
| |
| /// Get page reader for the `i`th column chunk. |
| fn get_column_page_reader(&self, i: usize) -> Result<Box<PageReader>>; |
| |
| /// Get value reader for the `i`th column chunk. |
| fn get_column_reader(&self, i: usize) -> Result<ColumnReader>; |
| |
| /// Get iterator of `Row`s from this row group. |
| /// |
| /// Projected schema can be a subset of or equal to the file schema, when it is None, |
| /// full file schema is assumed. |
| fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>; |
| } |
| |
| // ---------------------------------------------------------------------- |
| // Serialized impl for file & row group readers |
| |
| /// Length should return the amount of bytes that implementor contains. |
| /// It's mainly used to read the metadata, which is at the end of the source. |
| pub trait Length { |
| /// Returns the amount of bytes of the inner source. |
| fn len(&self) -> u64; |
| } |
| |
| /// TryClone tries to clone the type and should maintain the `Seek` position of the given |
| /// instance. |
| pub trait TryClone: Sized { |
| /// Clones the type returning a new instance or an error if it's not possible |
| /// to clone it. |
| fn try_clone(&self) -> Result<Self>; |
| } |
| |
| impl Length for File { |
| fn len(&self) -> u64 { |
| self.metadata().map(|m| m.len()).unwrap_or(0u64) |
| } |
| } |
| |
| impl TryClone for File { |
| fn try_clone(&self) -> Result<Self> { |
| self.try_clone().map_err(|e| e.into()) |
| } |
| } |
| |
| impl<'a> Length for Cursor<&'a [u8]> { |
| fn len(&self) -> u64 { |
| self.get_ref().len() as u64 |
| } |
| } |
| |
| impl<'a> TryClone for Cursor<&'a [u8]> { |
| fn try_clone(&self) -> Result<Self> { |
| Ok(self.clone()) |
| } |
| } |
| |
| /// ParquetReader is the interface which needs to be fulfilled to be able to parse a |
| /// parquet source. |
| pub trait ParquetReader: Read + Seek + Length + TryClone {} |
| impl<T: Read + Seek + Length + TryClone> ParquetReader for T {} |
| |
| /// A serialized implementation for Parquet [`FileReader`]. |
| pub struct SerializedFileReader<R: ParquetReader> { |
| buf: BufReader<R>, |
| metadata: ParquetMetaData, |
| } |
| |
| impl<R: ParquetReader> SerializedFileReader<R> { |
| /// Creates file reader from a Parquet file. |
| /// Returns error if Parquet file does not exist or is corrupt. |
| pub fn new(reader: R) -> Result<Self> { |
| let mut buf = BufReader::new(reader); |
| let metadata = Self::parse_metadata(&mut buf)?; |
| Ok(Self { buf, metadata }) |
| } |
| |
| // Layout of Parquet file |
| // +---------------------------+---+-----+ |
| // | Rest of file | B | A | |
| // +---------------------------+---+-----+ |
| // where A: parquet footer, B: parquet metadata. |
| // |
| fn parse_metadata(buf: &mut BufReader<R>) -> Result<ParquetMetaData> { |
| let file_size = buf.get_ref().len(); |
| if file_size < (FOOTER_SIZE as u64) { |
| return Err(general_err!( |
| "Invalid Parquet file. Size is smaller than footer" |
| )); |
| } |
| let mut footer_buffer: [u8; FOOTER_SIZE] = [0; FOOTER_SIZE]; |
| buf.seek(SeekFrom::End(-(FOOTER_SIZE as i64)))?; |
| buf.read_exact(&mut footer_buffer)?; |
| if footer_buffer[4..] != PARQUET_MAGIC { |
| return Err(general_err!("Invalid Parquet file. Corrupt footer")); |
| } |
| let metadata_len = LittleEndian::read_i32(&footer_buffer[0..4]) as i64; |
| if metadata_len < 0 { |
| return Err(general_err!( |
| "Invalid Parquet file. Metadata length is less than zero ({})", |
| metadata_len |
| )); |
| } |
| let metadata_start: i64 = file_size as i64 - FOOTER_SIZE as i64 - metadata_len; |
| if metadata_start < 0 { |
| return Err(general_err!( |
| "Invalid Parquet file. Metadata start is less than zero ({})", |
| metadata_start |
| )); |
| } |
| buf.seek(SeekFrom::Start(metadata_start as u64))?; |
| let metadata_buf = buf.take(metadata_len as u64).into_inner(); |
| |
| // TODO: row group filtering |
| let mut prot = TCompactInputProtocol::new(metadata_buf); |
| let mut t_file_metadata: TFileMetaData = |
| TFileMetaData::read_from_in_protocol(&mut prot).map_err(|e| { |
| ParquetError::General(format!("Could not parse metadata: {}", e)) |
| })?; |
| let schema = types::from_thrift(&mut t_file_metadata.schema)?; |
| let schema_descr = Rc::new(SchemaDescriptor::new(schema.clone())); |
| let mut row_groups = Vec::new(); |
| for rg in t_file_metadata.row_groups { |
| row_groups.push(RowGroupMetaData::from_thrift(schema_descr.clone(), rg)?); |
| } |
| let column_orders = |
| Self::parse_column_orders(t_file_metadata.column_orders, &schema_descr); |
| |
| let file_metadata = FileMetaData::new( |
| t_file_metadata.version, |
| t_file_metadata.num_rows, |
| t_file_metadata.created_by, |
| schema, |
| schema_descr, |
| column_orders, |
| ); |
| Ok(ParquetMetaData::new(file_metadata, row_groups)) |
| } |
| |
| /// Parses column orders from Thrift definition. |
| /// If no column orders are defined, returns `None`. |
| fn parse_column_orders( |
| t_column_orders: Option<Vec<TColumnOrder>>, |
| schema_descr: &SchemaDescriptor, |
| ) -> Option<Vec<ColumnOrder>> { |
| match t_column_orders { |
| Some(orders) => { |
| // Should always be the case |
| assert_eq!( |
| orders.len(), |
| schema_descr.num_columns(), |
| "Column order length mismatch" |
| ); |
| let mut res = Vec::new(); |
| for (i, column) in schema_descr.columns().iter().enumerate() { |
| match orders[i] { |
| TColumnOrder::TYPEORDER(_) => { |
| let sort_order = ColumnOrder::get_sort_order( |
| column.logical_type(), |
| column.physical_type(), |
| ); |
| res.push(ColumnOrder::TYPE_DEFINED_ORDER(sort_order)); |
| } |
| } |
| } |
| Some(res) |
| } |
| None => None, |
| } |
| } |
| } |
| |
| impl<R: 'static + ParquetReader> FileReader for SerializedFileReader<R> { |
| fn metadata(&self) -> &ParquetMetaData { |
| &self.metadata |
| } |
| |
| fn num_row_groups(&self) -> usize { |
| self.metadata.num_row_groups() |
| } |
| |
| fn get_row_group(&self, i: usize) -> Result<Box<RowGroupReader + '_>> { |
| let row_group_metadata = self.metadata.row_group(i); |
| // Row groups should be processed sequentially. |
| let f = self.buf.get_ref().try_clone()?; |
| Ok(Box::new(SerializedRowGroupReader::new( |
| f, |
| row_group_metadata, |
| ))) |
| } |
| |
| fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> { |
| RowIter::from_file(projection, self) |
| } |
| } |
| |
| impl TryFrom<File> for SerializedFileReader<File> { |
| type Error = ParquetError; |
| |
| fn try_from(file: File) -> Result<Self> { |
| Self::new(file) |
| } |
| } |
| |
| impl<'a> TryFrom<&'a Path> for SerializedFileReader<File> { |
| type Error = ParquetError; |
| |
| fn try_from(path: &Path) -> Result<Self> { |
| let file = File::open(path)?; |
| Self::try_from(file) |
| } |
| } |
| |
| impl TryFrom<String> for SerializedFileReader<File> { |
| type Error = ParquetError; |
| |
| fn try_from(path: String) -> Result<Self> { |
| Self::try_from(Path::new(&path)) |
| } |
| } |
| |
| impl<'a> TryFrom<&'a str> for SerializedFileReader<File> { |
| type Error = ParquetError; |
| |
| fn try_from(path: &str) -> Result<Self> { |
| Self::try_from(Path::new(&path)) |
| } |
| } |
| |
| /// Conversion into a [`RowIter`](crate::record::reader::RowIter) |
| /// using the full file schema over all row groups. |
| impl IntoIterator for SerializedFileReader<File> { |
| type Item = Row; |
| type IntoIter = RowIter<'static>; |
| |
| fn into_iter(self) -> Self::IntoIter { |
| RowIter::from_file_into(Box::new(self)) |
| } |
| } |
| |
| /// A serialized implementation for Parquet [`RowGroupReader`]. |
| pub struct SerializedRowGroupReader<'a, R: ParquetReader> { |
| buf: BufReader<R>, |
| metadata: &'a RowGroupMetaData, |
| } |
| |
| impl<'a, R: 'static + ParquetReader> SerializedRowGroupReader<'a, R> { |
| /// Creates new row group reader from a file and row group metadata. |
| fn new(file: R, metadata: &'a RowGroupMetaData) -> Self { |
| let buf = BufReader::new(file); |
| Self { buf, metadata } |
| } |
| } |
| |
| impl<'a, R: 'static + ParquetReader> RowGroupReader for SerializedRowGroupReader<'a, R> { |
| fn metadata(&self) -> &RowGroupMetaData { |
| &self.metadata |
| } |
| |
| fn num_columns(&self) -> usize { |
| self.metadata.num_columns() |
| } |
| |
| // TODO: fix PARQUET-816 |
| fn get_column_page_reader(&self, i: usize) -> Result<Box<PageReader>> { |
| let col = self.metadata.column(i); |
| let mut col_start = col.data_page_offset(); |
| if col.has_dictionary_page() { |
| col_start = col.dictionary_page_offset().unwrap(); |
| } |
| let col_length = col.compressed_size(); |
| let file_chunk = |
| FileSource::new(self.buf.get_ref(), col_start as u64, col_length as usize); |
| let page_reader = SerializedPageReader::new( |
| file_chunk, |
| col.num_values(), |
| col.compression(), |
| col.column_descr().physical_type(), |
| )?; |
| Ok(Box::new(page_reader)) |
| } |
| |
| fn get_column_reader(&self, i: usize) -> Result<ColumnReader> { |
| let schema_descr = self.metadata.schema_descr(); |
| let col_descr = schema_descr.column(i); |
| let col_page_reader = self.get_column_page_reader(i)?; |
| let col_reader = match col_descr.physical_type() { |
| Type::BOOLEAN => ColumnReader::BoolColumnReader(ColumnReaderImpl::new( |
| col_descr, |
| col_page_reader, |
| )), |
| Type::INT32 => ColumnReader::Int32ColumnReader(ColumnReaderImpl::new( |
| col_descr, |
| col_page_reader, |
| )), |
| Type::INT64 => ColumnReader::Int64ColumnReader(ColumnReaderImpl::new( |
| col_descr, |
| col_page_reader, |
| )), |
| Type::INT96 => ColumnReader::Int96ColumnReader(ColumnReaderImpl::new( |
| col_descr, |
| col_page_reader, |
| )), |
| Type::FLOAT => ColumnReader::FloatColumnReader(ColumnReaderImpl::new( |
| col_descr, |
| col_page_reader, |
| )), |
| Type::DOUBLE => ColumnReader::DoubleColumnReader(ColumnReaderImpl::new( |
| col_descr, |
| col_page_reader, |
| )), |
| Type::BYTE_ARRAY => ColumnReader::ByteArrayColumnReader( |
| ColumnReaderImpl::new(col_descr, col_page_reader), |
| ), |
| Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader( |
| ColumnReaderImpl::new(col_descr, col_page_reader), |
| ), |
| }; |
| Ok(col_reader) |
| } |
| |
| fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> { |
| RowIter::from_row_group(projection, self) |
| } |
| } |
| |
| /// A serialized implementation for Parquet [`PageReader`]. |
| pub struct SerializedPageReader<T: Read> { |
| // The file source buffer which references exactly the bytes for the column trunk |
| // to be read by this page reader. |
| buf: T, |
| |
| // The compression codec for this column chunk. Only set for non-PLAIN codec. |
| decompressor: Option<Box<Codec>>, |
| |
| // The number of values we have seen so far. |
| seen_num_values: i64, |
| |
| // The number of total values in this column chunk. |
| total_num_values: i64, |
| |
| // Column chunk type. |
| physical_type: Type, |
| } |
| |
| impl<T: Read> SerializedPageReader<T> { |
| /// Creates a new serialized page reader from file source. |
| pub fn new( |
| buf: T, |
| total_num_values: i64, |
| compression: Compression, |
| physical_type: Type, |
| ) -> Result<Self> { |
| let decompressor = create_codec(compression)?; |
| let result = Self { |
| buf, |
| total_num_values, |
| seen_num_values: 0, |
| decompressor, |
| physical_type, |
| }; |
| Ok(result) |
| } |
| |
| /// Reads Page header from Thrift. |
| fn read_page_header(&mut self) -> Result<PageHeader> { |
| let mut prot = TCompactInputProtocol::new(&mut self.buf); |
| let page_header = PageHeader::read_from_in_protocol(&mut prot)?; |
| Ok(page_header) |
| } |
| } |
| |
| impl<T: Read> PageReader for SerializedPageReader<T> { |
| fn get_next_page(&mut self) -> Result<Option<Page>> { |
| while self.seen_num_values < self.total_num_values { |
| let page_header = self.read_page_header()?; |
| |
| // When processing data page v2, depending on enabled compression for the |
| // page, we should account for uncompressed data ('offset') of |
| // repetition and definition levels. |
| // |
| // We always use 0 offset for other pages other than v2, `true` flag means |
| // that compression will be applied if decompressor is defined |
| let mut offset: usize = 0; |
| let mut can_decompress = true; |
| |
| if let Some(ref header_v2) = page_header.data_page_header_v2 { |
| offset = (header_v2.definition_levels_byte_length |
| + header_v2.repetition_levels_byte_length) |
| as usize; |
| // When is_compressed flag is missing the page is considered compressed |
| can_decompress = header_v2.is_compressed.unwrap_or(true); |
| } |
| |
| let compressed_len = page_header.compressed_page_size as usize - offset; |
| let uncompressed_len = page_header.uncompressed_page_size as usize - offset; |
| // We still need to read all bytes from buffered stream |
| let mut buffer = vec![0; offset + compressed_len]; |
| self.buf.read_exact(&mut buffer)?; |
| |
| // TODO: page header could be huge because of statistics. We should set a |
| // maximum page header size and abort if that is exceeded. |
| if let Some(decompressor) = self.decompressor.as_mut() { |
| if can_decompress { |
| let mut decompressed_buffer = Vec::with_capacity(uncompressed_len); |
| let decompressed_size = decompressor |
| .decompress(&buffer[offset..], &mut decompressed_buffer)?; |
| if decompressed_size != uncompressed_len { |
| return Err(general_err!( |
| "Actual decompressed size doesn't match the expected one ({} vs {})", |
| decompressed_size, |
| uncompressed_len |
| )); |
| } |
| if offset == 0 { |
| buffer = decompressed_buffer; |
| } else { |
| // Prepend saved offsets to the buffer |
| buffer.truncate(offset); |
| buffer.append(&mut decompressed_buffer); |
| } |
| } |
| } |
| |
| let result = match page_header.type_ { |
| PageType::DictionaryPage => { |
| assert!(page_header.dictionary_page_header.is_some()); |
| let dict_header = |
| page_header.dictionary_page_header.as_ref().unwrap(); |
| let is_sorted = dict_header.is_sorted.unwrap_or(false); |
| Page::DictionaryPage { |
| buf: ByteBufferPtr::new(buffer), |
| num_values: dict_header.num_values as u32, |
| encoding: Encoding::from(dict_header.encoding), |
| is_sorted, |
| } |
| } |
| PageType::DataPage => { |
| assert!(page_header.data_page_header.is_some()); |
| let header = page_header.data_page_header.unwrap(); |
| self.seen_num_values += header.num_values as i64; |
| Page::DataPage { |
| buf: ByteBufferPtr::new(buffer), |
| num_values: header.num_values as u32, |
| encoding: Encoding::from(header.encoding), |
| def_level_encoding: Encoding::from( |
| header.definition_level_encoding, |
| ), |
| rep_level_encoding: Encoding::from( |
| header.repetition_level_encoding, |
| ), |
| statistics: statistics::from_thrift( |
| self.physical_type, |
| header.statistics, |
| ), |
| } |
| } |
| PageType::DataPageV2 => { |
| assert!(page_header.data_page_header_v2.is_some()); |
| let header = page_header.data_page_header_v2.unwrap(); |
| let is_compressed = header.is_compressed.unwrap_or(true); |
| self.seen_num_values += header.num_values as i64; |
| Page::DataPageV2 { |
| buf: ByteBufferPtr::new(buffer), |
| num_values: header.num_values as u32, |
| encoding: Encoding::from(header.encoding), |
| num_nulls: header.num_nulls as u32, |
| num_rows: header.num_rows as u32, |
| def_levels_byte_len: header.definition_levels_byte_length as u32, |
| rep_levels_byte_len: header.repetition_levels_byte_length as u32, |
| is_compressed, |
| statistics: statistics::from_thrift( |
| self.physical_type, |
| header.statistics, |
| ), |
| } |
| } |
| _ => { |
| // For unknown page type (e.g., INDEX_PAGE), skip and read next. |
| continue; |
| } |
| }; |
| return Ok(Some(result)); |
| } |
| |
| // We are at the end of this column chunk and no more page left. Return None. |
| Ok(None) |
| } |
| } |
| |
| /// Implementation of page iterator for parquet file. |
| pub struct FilePageIterator { |
| column_index: usize, |
| row_group_indices: Box<Iterator<Item = usize>>, |
| file_reader: Rc<FileReader>, |
| } |
| |
| impl FilePageIterator { |
| /// Creates a page iterator for all row groups in file. |
| pub fn new(column_index: usize, file_reader: Rc<FileReader>) -> Result<Self> { |
| let num_row_groups = file_reader.metadata().num_row_groups(); |
| |
| let row_group_indices = Box::new(0..num_row_groups); |
| |
| Self::with_row_groups(column_index, row_group_indices, file_reader) |
| } |
| |
| /// Create page iterator from parquet file reader with only some row groups. |
| pub fn with_row_groups( |
| column_index: usize, |
| row_group_indices: Box<Iterator<Item = usize>>, |
| file_reader: Rc<FileReader>, |
| ) -> Result<Self> { |
| // Check that column_index is valid |
| let num_columns = file_reader |
| .metadata() |
| .file_metadata() |
| .schema_descr() |
| .num_columns(); |
| |
| if column_index >= num_columns { |
| return Err(ParquetError::IndexOutOfBound(column_index, num_columns)); |
| } |
| |
| // We don't check iterators here because iterator may be infinite |
| Ok(Self { |
| column_index, |
| row_group_indices, |
| file_reader, |
| }) |
| } |
| } |
| |
| impl Iterator for FilePageIterator { |
| type Item = Result<Box<PageReader>>; |
| |
| fn next(&mut self) -> Option<Result<Box<PageReader>>> { |
| self.row_group_indices.next().map(|row_group_index| { |
| self.file_reader |
| .get_row_group(row_group_index) |
| .and_then(|r| r.get_column_page_reader(self.column_index)) |
| }) |
| } |
| } |
| |
| impl PageIterator for FilePageIterator { |
| fn schema(&mut self) -> Result<SchemaDescPtr> { |
| Ok(self |
| .file_reader |
| .metadata() |
| .file_metadata() |
| .schema_descr_ptr()) |
| } |
| |
| fn column_schema(&mut self) -> Result<ColumnDescPtr> { |
| self.schema().map(|s| s.column(self.column_index)) |
| } |
| } |
| |
| #[cfg(test)] |
| mod tests { |
| use super::*; |
| |
| use parquet_format::TypeDefinedOrder; |
| |
| use crate::basic::SortOrder; |
| use crate::record::RowAccessor; |
| use crate::schema::parser::parse_message_type; |
| use crate::util::test_common::{get_temp_file, get_test_file, get_test_path}; |
| |
| #[test] |
| fn test_file_reader_metadata_size_smaller_than_footer() { |
| let test_file = get_temp_file("corrupt-1.parquet", &[]); |
| let reader_result = SerializedFileReader::new(test_file); |
| assert!(reader_result.is_err()); |
| assert_eq!( |
| reader_result.err().unwrap(), |
| general_err!("Invalid Parquet file. Size is smaller than footer") |
| ); |
| } |
| |
| // #[test] |
| // fn test_cursor_and_file_has_the_same_behaviour() { |
| // let path = get_test_path("alltypes_plain.parquet"); |
| // let buffer = include_bytes!(path); |
| // let cursor = Cursor::new(buffer.as_ref()); |
| |
| // let read_from_file = |
| // SerializedFileReader::new(File::open("testdata/alltypes_plain.parquet"). |
| // unwrap()) .unwrap(); |
| // let read_from_cursor = SerializedFileReader::new(cursor).unwrap(); |
| |
| // let file_iter = read_from_file.get_row_iter(None).unwrap(); |
| // let cursor_iter = read_from_cursor.get_row_iter(None).unwrap(); |
| |
| // assert!(file_iter.eq(cursor_iter)); |
| // } |
| |
| #[test] |
| fn test_file_reader_metadata_corrupt_footer() { |
| let test_file = get_temp_file("corrupt-2.parquet", &[1, 2, 3, 4, 5, 6, 7, 8]); |
| let reader_result = SerializedFileReader::new(test_file); |
| assert!(reader_result.is_err()); |
| assert_eq!( |
| reader_result.err().unwrap(), |
| general_err!("Invalid Parquet file. Corrupt footer") |
| ); |
| } |
| |
| #[test] |
| fn test_file_reader_metadata_invalid_length() { |
| let test_file = |
| get_temp_file("corrupt-3.parquet", &[0, 0, 0, 255, b'P', b'A', b'R', b'1']); |
| let reader_result = SerializedFileReader::new(test_file); |
| assert!(reader_result.is_err()); |
| assert_eq!( |
| reader_result.err().unwrap(), |
| general_err!( |
| "Invalid Parquet file. Metadata length is less than zero (-16777216)" |
| ) |
| ); |
| } |
| |
| #[test] |
| fn test_file_reader_metadata_invalid_start() { |
| let test_file = |
| get_temp_file("corrupt-4.parquet", &[255, 0, 0, 0, b'P', b'A', b'R', b'1']); |
| let reader_result = SerializedFileReader::new(test_file); |
| assert!(reader_result.is_err()); |
| assert_eq!( |
| reader_result.err().unwrap(), |
| general_err!("Invalid Parquet file. Metadata start is less than zero (-255)") |
| ); |
| } |
| |
| #[test] |
| fn test_file_reader_column_orders_parse() { |
| // Define simple schema, we do not need to provide logical types. |
| let mut fields = vec![ |
| Rc::new( |
| SchemaType::primitive_type_builder("col1", Type::INT32) |
| .build() |
| .unwrap(), |
| ), |
| Rc::new( |
| SchemaType::primitive_type_builder("col2", Type::FLOAT) |
| .build() |
| .unwrap(), |
| ), |
| ]; |
| let schema = SchemaType::group_type_builder("schema") |
| .with_fields(&mut fields) |
| .build() |
| .unwrap(); |
| let schema_descr = SchemaDescriptor::new(Rc::new(schema)); |
| |
| let t_column_orders = Some(vec![ |
| TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), |
| TColumnOrder::TYPEORDER(TypeDefinedOrder::new()), |
| ]); |
| |
| assert_eq!( |
| SerializedFileReader::<File>::parse_column_orders( |
| t_column_orders, |
| &schema_descr |
| ), |
| Some(vec![ |
| ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED), |
| ColumnOrder::TYPE_DEFINED_ORDER(SortOrder::SIGNED) |
| ]) |
| ); |
| |
| // Test when no column orders are defined. |
| assert_eq!( |
| SerializedFileReader::<File>::parse_column_orders(None, &schema_descr), |
| None |
| ); |
| } |
| |
| #[test] |
| #[should_panic(expected = "Column order length mismatch")] |
| fn test_file_reader_column_orders_len_mismatch() { |
| let schema = SchemaType::group_type_builder("schema").build().unwrap(); |
| let schema_descr = SchemaDescriptor::new(Rc::new(schema)); |
| |
| let t_column_orders = |
| Some(vec![TColumnOrder::TYPEORDER(TypeDefinedOrder::new())]); |
| |
| SerializedFileReader::<File>::parse_column_orders(t_column_orders, &schema_descr); |
| } |
| |
| #[test] |
| fn test_file_reader_try_from() { |
| // Valid file path |
| let test_file = get_test_file("alltypes_plain.parquet"); |
| let test_path_buf = get_test_path("alltypes_plain.parquet"); |
| let test_path = test_path_buf.as_path(); |
| let test_path_str = test_path.to_str().unwrap(); |
| |
| let reader = SerializedFileReader::try_from(test_file); |
| assert!(reader.is_ok()); |
| |
| let reader = SerializedFileReader::try_from(test_path); |
| assert!(reader.is_ok()); |
| |
| let reader = SerializedFileReader::try_from(test_path_str); |
| assert!(reader.is_ok()); |
| |
| let reader = SerializedFileReader::try_from(test_path_str.to_string()); |
| assert!(reader.is_ok()); |
| |
| // Invalid file path |
| let test_path = Path::new("invalid.parquet"); |
| let test_path_str = test_path.to_str().unwrap(); |
| |
| let reader = SerializedFileReader::try_from(test_path); |
| assert!(reader.is_err()); |
| |
| let reader = SerializedFileReader::try_from(test_path_str); |
| assert!(reader.is_err()); |
| |
| let reader = SerializedFileReader::try_from(test_path_str.to_string()); |
| assert!(reader.is_err()); |
| } |
| |
| #[test] |
| fn test_file_reader_into_iter() -> Result<()> { |
| let path = get_test_path("alltypes_plain.parquet"); |
| let vec = vec![path.clone(), path.clone()] |
| .iter() |
| .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) |
| .flat_map(|r| r.into_iter()) |
| .flat_map(|r| r.get_int(0)) |
| .collect::<Vec<_>>(); |
| |
| // rows in the parquet file are not sorted by "id" |
| // each file contains [id:4, id:5, id:6, id:7, id:2, id:3, id:0, id:1] |
| assert_eq!(vec, vec![4, 5, 6, 7, 2, 3, 0, 1, 4, 5, 6, 7, 2, 3, 0, 1]); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_file_reader_into_iter_project() -> Result<()> { |
| let path = get_test_path("alltypes_plain.parquet"); |
| let result = vec![path] |
| .iter() |
| .map(|p| SerializedFileReader::try_from(p.as_path()).unwrap()) |
| .flat_map(|r| { |
| let schema = "message schema { OPTIONAL INT32 id; }"; |
| let proj = parse_message_type(&schema).ok(); |
| |
| r.into_iter().project(proj).unwrap() |
| }) |
| .map(|r| format!("{}", r)) |
| .collect::<Vec<_>>() |
| .join(","); |
| |
| assert_eq!( |
| result, |
| "{id: 4},{id: 5},{id: 6},{id: 7},{id: 2},{id: 3},{id: 0},{id: 1}" |
| ); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_reuse_file_chunk() { |
| // This test covers the case of maintaining the correct start position in a file |
| // stream for each column reader after initializing and moving to the next one |
| // (without necessarily reading the entire column). |
| let test_file = get_test_file("alltypes_plain.parquet"); |
| let reader = SerializedFileReader::new(test_file).unwrap(); |
| let row_group = reader.get_row_group(0).unwrap(); |
| |
| let mut page_readers = Vec::new(); |
| for i in 0..row_group.num_columns() { |
| page_readers.push(row_group.get_column_page_reader(i).unwrap()); |
| } |
| |
| // Now buffer each col reader, we do not expect any failures like: |
| // General("underlying Thrift error: end of file") |
| for mut page_reader in page_readers { |
| assert!(page_reader.get_next_page().is_ok()); |
| } |
| } |
| |
| #[test] |
| fn test_file_reader() { |
| let test_file = get_test_file("alltypes_plain.parquet"); |
| let reader_result = SerializedFileReader::new(test_file); |
| assert!(reader_result.is_ok()); |
| let reader = reader_result.unwrap(); |
| |
| // Test contents in Parquet metadata |
| let metadata = reader.metadata(); |
| assert_eq!(metadata.num_row_groups(), 1); |
| |
| // Test contents in file metadata |
| let file_metadata = metadata.file_metadata(); |
| assert!(file_metadata.created_by().is_some()); |
| assert_eq!( |
| file_metadata.created_by().as_ref().unwrap(), |
| "impala version 1.3.0-INTERNAL (build 8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)" |
| ); |
| assert_eq!(file_metadata.num_rows(), 8); |
| assert_eq!(file_metadata.version(), 1); |
| assert_eq!(file_metadata.column_orders(), None); |
| |
| // Test contents in row group metadata |
| let row_group_metadata = metadata.row_group(0); |
| assert_eq!(row_group_metadata.num_columns(), 11); |
| assert_eq!(row_group_metadata.num_rows(), 8); |
| assert_eq!(row_group_metadata.total_byte_size(), 671); |
| // Check each column order |
| for i in 0..row_group_metadata.num_columns() { |
| assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED); |
| } |
| |
| // Test row group reader |
| let row_group_reader_result = reader.get_row_group(0); |
| assert!(row_group_reader_result.is_ok()); |
| let row_group_reader: Box<RowGroupReader> = row_group_reader_result.unwrap(); |
| assert_eq!( |
| row_group_reader.num_columns(), |
| row_group_metadata.num_columns() |
| ); |
| assert_eq!( |
| row_group_reader.metadata().total_byte_size(), |
| row_group_metadata.total_byte_size() |
| ); |
| |
| // Test page readers |
| // TODO: test for every column |
| let page_reader_0_result = row_group_reader.get_column_page_reader(0); |
| assert!(page_reader_0_result.is_ok()); |
| let mut page_reader_0: Box<PageReader> = page_reader_0_result.unwrap(); |
| let mut page_count = 0; |
| while let Ok(Some(page)) = page_reader_0.get_next_page() { |
| let is_expected_page = match page { |
| Page::DictionaryPage { |
| buf, |
| num_values, |
| encoding, |
| is_sorted, |
| } => { |
| assert_eq!(buf.len(), 32); |
| assert_eq!(num_values, 8); |
| assert_eq!(encoding, Encoding::PLAIN_DICTIONARY); |
| assert_eq!(is_sorted, false); |
| true |
| } |
| Page::DataPage { |
| buf, |
| num_values, |
| encoding, |
| def_level_encoding, |
| rep_level_encoding, |
| statistics, |
| } => { |
| assert_eq!(buf.len(), 11); |
| assert_eq!(num_values, 8); |
| assert_eq!(encoding, Encoding::PLAIN_DICTIONARY); |
| assert_eq!(def_level_encoding, Encoding::RLE); |
| assert_eq!(rep_level_encoding, Encoding::BIT_PACKED); |
| assert!(statistics.is_none()); |
| true |
| } |
| _ => false, |
| }; |
| assert!(is_expected_page); |
| page_count += 1; |
| } |
| assert_eq!(page_count, 2); |
| } |
| |
| #[test] |
| fn test_file_reader_datapage_v2() { |
| let test_file = get_test_file("datapage_v2.snappy.parquet"); |
| let reader_result = SerializedFileReader::new(test_file); |
| assert!(reader_result.is_ok()); |
| let reader = reader_result.unwrap(); |
| |
| // Test contents in Parquet metadata |
| let metadata = reader.metadata(); |
| assert_eq!(metadata.num_row_groups(), 1); |
| |
| // Test contents in file metadata |
| let file_metadata = metadata.file_metadata(); |
| assert!(file_metadata.created_by().is_some()); |
| assert_eq!( |
| file_metadata.created_by().as_ref().unwrap(), |
| "parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)" |
| ); |
| assert_eq!(file_metadata.num_rows(), 5); |
| assert_eq!(file_metadata.version(), 1); |
| assert_eq!(file_metadata.column_orders(), None); |
| |
| let row_group_metadata = metadata.row_group(0); |
| |
| // Check each column order |
| for i in 0..row_group_metadata.num_columns() { |
| assert_eq!(file_metadata.column_order(i), ColumnOrder::UNDEFINED); |
| } |
| |
| // Test row group reader |
| let row_group_reader_result = reader.get_row_group(0); |
| assert!(row_group_reader_result.is_ok()); |
| let row_group_reader: Box<RowGroupReader> = row_group_reader_result.unwrap(); |
| assert_eq!( |
| row_group_reader.num_columns(), |
| row_group_metadata.num_columns() |
| ); |
| assert_eq!( |
| row_group_reader.metadata().total_byte_size(), |
| row_group_metadata.total_byte_size() |
| ); |
| |
| // Test page readers |
| // TODO: test for every column |
| let page_reader_0_result = row_group_reader.get_column_page_reader(0); |
| assert!(page_reader_0_result.is_ok()); |
| let mut page_reader_0: Box<PageReader> = page_reader_0_result.unwrap(); |
| let mut page_count = 0; |
| while let Ok(Some(page)) = page_reader_0.get_next_page() { |
| let is_expected_page = match page { |
| Page::DictionaryPage { |
| buf, |
| num_values, |
| encoding, |
| is_sorted, |
| } => { |
| assert_eq!(buf.len(), 7); |
| assert_eq!(num_values, 1); |
| assert_eq!(encoding, Encoding::PLAIN); |
| assert_eq!(is_sorted, false); |
| true |
| } |
| Page::DataPageV2 { |
| buf, |
| num_values, |
| encoding, |
| num_nulls, |
| num_rows, |
| def_levels_byte_len, |
| rep_levels_byte_len, |
| is_compressed, |
| statistics, |
| } => { |
| assert_eq!(buf.len(), 4); |
| assert_eq!(num_values, 5); |
| assert_eq!(encoding, Encoding::RLE_DICTIONARY); |
| assert_eq!(num_nulls, 1); |
| assert_eq!(num_rows, 5); |
| assert_eq!(def_levels_byte_len, 2); |
| assert_eq!(rep_levels_byte_len, 0); |
| assert_eq!(is_compressed, true); |
| assert!(statistics.is_some()); |
| true |
| } |
| _ => false, |
| }; |
| assert!(is_expected_page); |
| page_count += 1; |
| } |
| assert_eq!(page_count, 2); |
| } |
| |
| #[test] |
| fn test_page_iterator() { |
| let file = get_test_file("alltypes_plain.parquet"); |
| let file_reader = Rc::new(SerializedFileReader::new(file).unwrap()); |
| |
| let mut page_iterator = FilePageIterator::new(0, file_reader.clone()).unwrap(); |
| |
| // read first page |
| let page = page_iterator.next(); |
| assert!(page.is_some()); |
| assert!(page.unwrap().is_ok()); |
| |
| // reach end of file |
| let page = page_iterator.next(); |
| assert!(page.is_none()); |
| |
| let row_group_indices = Box::new(0..1); |
| let mut page_iterator = |
| FilePageIterator::with_row_groups(0, row_group_indices, file_reader.clone()) |
| .unwrap(); |
| |
| // read first page |
| let page = page_iterator.next(); |
| assert!(page.is_some()); |
| assert!(page.unwrap().is_ok()); |
| |
| // reach end of file |
| let page = page_iterator.next(); |
| assert!(page.is_none()); |
| } |
| } |