Add round trip tests for reading/writing parquet metadata (#6463)
* Add round trip tests for reading/writing parquet metadata
* remove redundant tests
* Apply suggestions from code review
Co-authored-by: Matthijs Brobbel <m1brobbel@gmail.com>
* fix merge
---------
Co-authored-by: Matthijs Brobbel <m1brobbel@gmail.com>
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index 7378433..2d09cd1 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -269,6 +269,82 @@
);
}
+ #[test]
+ fn test_metadata_read_write_roundtrip() {
+ let parquet_bytes = create_parquet_file();
+
+ // read the metadata from the file
+ let original_metadata = ParquetMetaDataReader::new()
+ .parse_and_finish(&parquet_bytes)
+ .unwrap();
+
+ // read metadata back from the serialized bytes and ensure it is the same
+ let metadata_bytes = metadata_to_bytes(&original_metadata);
+ assert_ne!(
+ metadata_bytes.len(),
+ parquet_bytes.len(),
+ "metadata is subset of parquet"
+ );
+
+ let roundtrip_metadata = ParquetMetaDataReader::new()
+ .parse_and_finish(&metadata_bytes)
+ .unwrap();
+
+ assert_eq!(original_metadata, roundtrip_metadata);
+ }
+
+ #[test]
+ fn test_metadata_read_write_roundtrip_page_index() {
+ let parquet_bytes = create_parquet_file();
+
+ // read the metadata from the file including the page index structures
+ // (which are stored elsewhere in the footer)
+ let original_metadata = ParquetMetaDataReader::new()
+ .with_page_indexes(true)
+ .parse_and_finish(&parquet_bytes)
+ .unwrap();
+
+ // read metadata back from the serialized bytes and ensure it is the same
+ let metadata_bytes = metadata_to_bytes(&original_metadata);
+ let roundtrip_metadata = ParquetMetaDataReader::new()
+ .with_page_indexes(true)
+ .parse_and_finish(&metadata_bytes)
+ .unwrap();
+
+ // Need to normalize the metadata first to remove offsets in data
+ let original_metadata = normalize_locations(original_metadata);
+ let roundtrip_metadata = normalize_locations(roundtrip_metadata);
+ assert_eq!(
+ format!("{original_metadata:#?}"),
+ format!("{roundtrip_metadata:#?}")
+ );
+ assert_eq!(original_metadata, roundtrip_metadata);
+ }
+
+ /// Sets the page index offset locations in the metadata to `None`
+ ///
+ /// This is because the offsets are used to find the relative location of the index
+ /// structures, and thus differ depending on how the structures are stored.
+ fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
+ let mut metadata_builder = metadata.into_builder();
+ for rg in metadata_builder.take_row_groups() {
+ let mut rg_builder = rg.into_builder();
+ for col in rg_builder.take_columns() {
+ rg_builder = rg_builder.add_column_metadata(
+ col.into_builder()
+ .set_offset_index_offset(None)
+ .set_index_page_offset(None)
+ .set_column_index_offset(None)
+ .build()
+ .unwrap(),
+ );
+ }
+ let rg = rg_builder.build().unwrap();
+ metadata_builder = metadata_builder.add_row_group(rg);
+ }
+ metadata_builder.build()
+ }
+
/// Write a parquet filed into an in memory buffer
fn create_parquet_file() -> Bytes {
let mut buf = vec![];
diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs
index 69a939e..87f1fde 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -378,298 +378,3 @@
}
}
}
-
-#[cfg(test)]
-#[cfg(feature = "arrow")]
-#[cfg(feature = "async")]
-mod tests {
- use std::sync::Arc;
-
- use crate::file::metadata::{
- ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter,
- RowGroupMetaData,
- };
- use crate::file::properties::{EnabledStatistics, WriterProperties};
- use crate::file::reader::{FileReader, SerializedFileReader};
- use crate::{
- arrow::ArrowWriter,
- file::{page_index::index::Index, serialized_reader::ReadOptionsBuilder},
- };
- use arrow_array::{ArrayRef, Int32Array, RecordBatch};
- use arrow_schema::{DataType as ArrowDataType, Field, Schema};
- use bytes::{BufMut, Bytes, BytesMut};
-
- struct TestMetadata {
- #[allow(dead_code)]
- file_size: usize,
- metadata: ParquetMetaData,
- }
-
- fn has_page_index(metadata: &ParquetMetaData) -> bool {
- match metadata.column_index() {
- Some(column_index) => column_index
- .iter()
- .any(|rg_idx| rg_idx.iter().all(|col_idx| !matches!(col_idx, Index::NONE))),
- None => false,
- }
- }
-
- #[test]
- fn test_roundtrip_parquet_metadata_without_page_index() {
- // We currently don't have an ad-hoc ParquetMetadata loader that can load page indexes so
- // we at least test round trip without them
- let metadata = get_test_metadata(false, false);
- assert!(!has_page_index(&metadata.metadata));
-
- let mut buf = BytesMut::new().writer();
- {
- let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata);
- writer.finish().unwrap();
- }
-
- let data = buf.into_inner().freeze();
-
- let decoded_metadata = ParquetMetaDataReader::new()
- .parse_and_finish(&data)
- .unwrap();
- assert!(!has_page_index(&metadata.metadata));
-
- assert_eq!(metadata.metadata, decoded_metadata);
- }
-
- fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> TestMetadata {
- let mut buf = BytesMut::new().writer();
- let schema: Arc<Schema> = Arc::new(Schema::new(vec![Field::new(
- "a",
- ArrowDataType::Int32,
- true,
- )]));
-
- // build row groups / pages that exercise different combinations of nulls and values
- // note that below we set the row group and page sizes to 4 and 2 respectively
- // so that these "groupings" make sense
- let a: ArrayRef = Arc::new(Int32Array::from(vec![
- // a row group that has all values
- Some(i32::MIN),
- Some(-1),
- Some(1),
- Some(i32::MAX),
- // a row group with a page of all nulls and a page of all values
- None,
- None,
- Some(2),
- Some(3),
- // a row group that has all null pages
- None,
- None,
- None,
- None,
- // a row group having 1 page with all values and 1 page with some nulls
- Some(4),
- Some(5),
- None,
- Some(6),
- // a row group having 1 page with all nulls and 1 page with some nulls
- None,
- None,
- Some(7),
- None,
- // a row group having all pages with some nulls
- None,
- Some(8),
- Some(9),
- None,
- ]));
-
- let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
-
- let writer_props_builder = match write_page_index {
- true => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Page),
- false => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Chunk),
- };
-
- // tune the size or pages to the data above
- // to make sure we exercise code paths where all items in a page are null, etc.
- let writer_props = writer_props_builder
- .set_max_row_group_size(4)
- .set_data_page_row_count_limit(2)
- .set_write_batch_size(2)
- .build();
-
- let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap();
- writer.write(&batch).unwrap();
- writer.close().unwrap();
-
- let data = buf.into_inner().freeze();
-
- let reader_opts = match read_page_index {
- true => ReadOptionsBuilder::new().with_page_index().build(),
- false => ReadOptionsBuilder::new().build(),
- };
- let reader = SerializedFileReader::new_with_options(data.clone(), reader_opts).unwrap();
- let metadata = reader.metadata().clone();
- TestMetadata {
- file_size: data.len(),
- metadata,
- }
- }
-
- /// Temporary function so we can test loading metadata with page indexes
- /// while we haven't fully figured out how to load it cleanly
- async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData {
- use crate::arrow::async_reader::MetadataFetch;
- use crate::errors::Result as ParquetResult;
- use futures::future::BoxFuture;
- use futures::FutureExt;
- use std::ops::Range;
-
- /// Adapt a `Bytes` to a `MetadataFetch` implementation.
- struct AsyncBytes {
- data: Bytes,
- }
-
- impl AsyncBytes {
- fn new(data: Bytes) -> Self {
- Self { data }
- }
- }
-
- impl MetadataFetch for AsyncBytes {
- fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
- async move { Ok(self.data.slice(range.start..range.end)) }.boxed()
- }
- }
-
- /// A `MetadataFetch` implementation that reads from a subset of the full data
- /// while accepting ranges that address the full data.
- struct MaskedBytes {
- inner: Box<dyn MetadataFetch + Send>,
- inner_range: Range<usize>,
- }
-
- impl MaskedBytes {
- fn new(inner: Box<dyn MetadataFetch + Send>, inner_range: Range<usize>) -> Self {
- Self { inner, inner_range }
- }
- }
-
- impl MetadataFetch for &mut MaskedBytes {
- fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
- let inner_range = self.inner_range.clone();
- println!("inner_range: {:?}", inner_range);
- println!("range: {:?}", range);
- assert!(inner_range.start <= range.start && inner_range.end >= range.end);
- let range =
- range.start - self.inner_range.start..range.end - self.inner_range.start;
- self.inner.fetch(range)
- }
- }
-
- let metadata_length = data.len();
- let mut reader = MaskedBytes::new(
- Box::new(AsyncBytes::new(data)),
- file_size - metadata_length..file_size,
- );
- ParquetMetaDataReader::new()
- .with_page_indexes(true)
- .load_and_finish(&mut reader, file_size)
- .await
- .unwrap()
- }
-
- fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) {
- assert_eq!(left.column_descr(), right.column_descr());
- assert_eq!(left.encodings(), right.encodings());
- assert_eq!(left.num_values(), right.num_values());
- assert_eq!(left.compressed_size(), right.compressed_size());
- assert_eq!(left.data_page_offset(), right.data_page_offset());
- assert_eq!(left.statistics(), right.statistics());
- assert_eq!(left.offset_index_length(), right.offset_index_length());
- assert_eq!(left.column_index_length(), right.column_index_length());
- assert_eq!(
- left.unencoded_byte_array_data_bytes(),
- right.unencoded_byte_array_data_bytes()
- );
- }
-
- fn check_row_groups_are_equivalent(left: &RowGroupMetaData, right: &RowGroupMetaData) {
- assert_eq!(left.num_rows(), right.num_rows());
- assert_eq!(left.file_offset(), right.file_offset());
- assert_eq!(left.total_byte_size(), right.total_byte_size());
- assert_eq!(left.schema_descr(), right.schema_descr());
- assert_eq!(left.num_columns(), right.num_columns());
- left.columns()
- .iter()
- .zip(right.columns().iter())
- .for_each(|(lc, rc)| {
- check_columns_are_equivalent(lc, rc);
- });
- }
-
- #[tokio::test]
- async fn test_encode_parquet_metadata_with_page_index() {
- // Create a ParquetMetadata with page index information
- let metadata = get_test_metadata(true, true);
- assert!(has_page_index(&metadata.metadata));
-
- let mut buf = BytesMut::new().writer();
- {
- let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata);
- writer.finish().unwrap();
- }
-
- let data = buf.into_inner().freeze();
-
- let decoded_metadata = load_metadata_from_bytes(data.len(), data).await;
-
- // Because the page index offsets will differ, compare invariant parts of the metadata
- assert_eq!(
- metadata.metadata.file_metadata(),
- decoded_metadata.file_metadata()
- );
- assert_eq!(
- metadata.metadata.column_index(),
- decoded_metadata.column_index()
- );
- assert_eq!(
- metadata.metadata.offset_index(),
- decoded_metadata.offset_index()
- );
- assert_eq!(
- metadata.metadata.num_row_groups(),
- decoded_metadata.num_row_groups()
- );
-
- // check that the mins and maxes are what we expect for each page
- // also indirectly checking that the pages were written out as we expected them to be laid out
- // (if they're not, or something gets refactored in the future that breaks that assumption,
- // this test may have to drop down to a lower level and create metadata directly instead of relying on
- // writing an entire file)
- let column_indexes = metadata.metadata.column_index().unwrap();
- assert_eq!(column_indexes.len(), 6);
- // make sure each row group has 2 pages by checking the first column
- // page counts for each column for each row group, should all be the same and there should be
- // 12 pages in total across 6 row groups / 1 column
- let mut page_counts = vec![];
- for row_group in column_indexes {
- for column in row_group {
- match column {
- Index::INT32(column_index) => {
- page_counts.push(column_index.indexes.len());
- }
- _ => panic!("unexpected column index type"),
- }
- }
- }
- assert_eq!(page_counts, vec![2; 6]);
-
- metadata
- .metadata
- .row_groups()
- .iter()
- .zip(decoded_metadata.row_groups().iter())
- .for_each(|(left, right)| {
- check_row_groups_are_equivalent(left, right);
- });
- }
-}