Add round trip tests for reading/writing parquet metadata (#6463)

* Add round trip tests for reading/writing parquet metadata

* remove redundant tests

* Apply suggestions from code review

Co-authored-by: Matthijs Brobbel <m1brobbel@gmail.com>

* fix merge

---------

Co-authored-by: Matthijs Brobbel <m1brobbel@gmail.com>
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index 7378433..2d09cd1 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -269,6 +269,82 @@
         );
     }
 
+    #[test]
+    fn test_metadata_read_write_roundtrip() {
+        let parquet_bytes = create_parquet_file();
+
+        // read the metadata from the file
+        let original_metadata = ParquetMetaDataReader::new()
+            .parse_and_finish(&parquet_bytes)
+            .unwrap();
+
+        // read metadata back from the serialized bytes and ensure it is the same
+        let metadata_bytes = metadata_to_bytes(&original_metadata);
+        assert_ne!(
+            metadata_bytes.len(),
+            parquet_bytes.len(),
+            "metadata is subset of parquet"
+        );
+
+        let roundtrip_metadata = ParquetMetaDataReader::new()
+            .parse_and_finish(&metadata_bytes)
+            .unwrap();
+
+        assert_eq!(original_metadata, roundtrip_metadata);
+    }
+
+    #[test]
+    fn test_metadata_read_write_roundtrip_page_index() {
+        let parquet_bytes = create_parquet_file();
+
+        // read the metadata from the file including the page index structures
+        // (which are stored elsewhere in the footer)
+        let original_metadata = ParquetMetaDataReader::new()
+            .with_page_indexes(true)
+            .parse_and_finish(&parquet_bytes)
+            .unwrap();
+
+        // read metadata back from the serialized bytes and ensure it is the same
+        let metadata_bytes = metadata_to_bytes(&original_metadata);
+        let roundtrip_metadata = ParquetMetaDataReader::new()
+            .with_page_indexes(true)
+            .parse_and_finish(&metadata_bytes)
+            .unwrap();
+
+        // Need to normalize the metadata first to remove offsets in data
+        let original_metadata = normalize_locations(original_metadata);
+        let roundtrip_metadata = normalize_locations(roundtrip_metadata);
+        assert_eq!(
+            format!("{original_metadata:#?}"),
+            format!("{roundtrip_metadata:#?}")
+        );
+        assert_eq!(original_metadata, roundtrip_metadata);
+    }
+
+    /// Sets the page index offset locations in the metadata to `None`
+    ///
+    /// This is because the offsets are used to find the relative location of the index
+    /// structures, and thus differ depending on how the structures are stored.
+    fn normalize_locations(metadata: ParquetMetaData) -> ParquetMetaData {
+        let mut metadata_builder = metadata.into_builder();
+        for rg in metadata_builder.take_row_groups() {
+            let mut rg_builder = rg.into_builder();
+            for col in rg_builder.take_columns() {
+                rg_builder = rg_builder.add_column_metadata(
+                    col.into_builder()
+                        .set_offset_index_offset(None)
+                        .set_index_page_offset(None)
+                        .set_column_index_offset(None)
+                        .build()
+                        .unwrap(),
+                );
+            }
+            let rg = rg_builder.build().unwrap();
+            metadata_builder = metadata_builder.add_row_group(rg);
+        }
+        metadata_builder.build()
+    }
+
     /// Write a parquet filed into an in memory buffer
     fn create_parquet_file() -> Bytes {
         let mut buf = vec![];
diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs
index 69a939e..87f1fde 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -378,298 +378,3 @@
         }
     }
 }
-
-#[cfg(test)]
-#[cfg(feature = "arrow")]
-#[cfg(feature = "async")]
-mod tests {
-    use std::sync::Arc;
-
-    use crate::file::metadata::{
-        ColumnChunkMetaData, ParquetMetaData, ParquetMetaDataReader, ParquetMetaDataWriter,
-        RowGroupMetaData,
-    };
-    use crate::file::properties::{EnabledStatistics, WriterProperties};
-    use crate::file::reader::{FileReader, SerializedFileReader};
-    use crate::{
-        arrow::ArrowWriter,
-        file::{page_index::index::Index, serialized_reader::ReadOptionsBuilder},
-    };
-    use arrow_array::{ArrayRef, Int32Array, RecordBatch};
-    use arrow_schema::{DataType as ArrowDataType, Field, Schema};
-    use bytes::{BufMut, Bytes, BytesMut};
-
-    struct TestMetadata {
-        #[allow(dead_code)]
-        file_size: usize,
-        metadata: ParquetMetaData,
-    }
-
-    fn has_page_index(metadata: &ParquetMetaData) -> bool {
-        match metadata.column_index() {
-            Some(column_index) => column_index
-                .iter()
-                .any(|rg_idx| rg_idx.iter().all(|col_idx| !matches!(col_idx, Index::NONE))),
-            None => false,
-        }
-    }
-
-    #[test]
-    fn test_roundtrip_parquet_metadata_without_page_index() {
-        // We currently don't have an ad-hoc ParquetMetadata loader that can load page indexes so
-        // we at least test round trip without them
-        let metadata = get_test_metadata(false, false);
-        assert!(!has_page_index(&metadata.metadata));
-
-        let mut buf = BytesMut::new().writer();
-        {
-            let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata);
-            writer.finish().unwrap();
-        }
-
-        let data = buf.into_inner().freeze();
-
-        let decoded_metadata = ParquetMetaDataReader::new()
-            .parse_and_finish(&data)
-            .unwrap();
-        assert!(!has_page_index(&metadata.metadata));
-
-        assert_eq!(metadata.metadata, decoded_metadata);
-    }
-
-    fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> TestMetadata {
-        let mut buf = BytesMut::new().writer();
-        let schema: Arc<Schema> = Arc::new(Schema::new(vec![Field::new(
-            "a",
-            ArrowDataType::Int32,
-            true,
-        )]));
-
-        // build row groups / pages that exercise different combinations of nulls and values
-        // note that below we set the row group and page sizes to 4 and 2 respectively
-        // so that these "groupings" make sense
-        let a: ArrayRef = Arc::new(Int32Array::from(vec![
-            // a row group that has all values
-            Some(i32::MIN),
-            Some(-1),
-            Some(1),
-            Some(i32::MAX),
-            // a row group with a page of all nulls and a page of all values
-            None,
-            None,
-            Some(2),
-            Some(3),
-            // a row group that has all null pages
-            None,
-            None,
-            None,
-            None,
-            // a row group having 1 page with all values and 1 page with some nulls
-            Some(4),
-            Some(5),
-            None,
-            Some(6),
-            // a row group having 1 page with all nulls and 1 page with some nulls
-            None,
-            None,
-            Some(7),
-            None,
-            // a row group having all pages with some nulls
-            None,
-            Some(8),
-            Some(9),
-            None,
-        ]));
-
-        let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
-
-        let writer_props_builder = match write_page_index {
-            true => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Page),
-            false => WriterProperties::builder().set_statistics_enabled(EnabledStatistics::Chunk),
-        };
-
-        // tune the size or pages to the data above
-        // to make sure we exercise code paths where all items in a page are null, etc.
-        let writer_props = writer_props_builder
-            .set_max_row_group_size(4)
-            .set_data_page_row_count_limit(2)
-            .set_write_batch_size(2)
-            .build();
-
-        let mut writer = ArrowWriter::try_new(&mut buf, schema, Some(writer_props)).unwrap();
-        writer.write(&batch).unwrap();
-        writer.close().unwrap();
-
-        let data = buf.into_inner().freeze();
-
-        let reader_opts = match read_page_index {
-            true => ReadOptionsBuilder::new().with_page_index().build(),
-            false => ReadOptionsBuilder::new().build(),
-        };
-        let reader = SerializedFileReader::new_with_options(data.clone(), reader_opts).unwrap();
-        let metadata = reader.metadata().clone();
-        TestMetadata {
-            file_size: data.len(),
-            metadata,
-        }
-    }
-
-    /// Temporary function so we can test loading metadata with page indexes
-    /// while we haven't fully figured out how to load it cleanly
-    async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData {
-        use crate::arrow::async_reader::MetadataFetch;
-        use crate::errors::Result as ParquetResult;
-        use futures::future::BoxFuture;
-        use futures::FutureExt;
-        use std::ops::Range;
-
-        /// Adapt a `Bytes` to a `MetadataFetch` implementation.
-        struct AsyncBytes {
-            data: Bytes,
-        }
-
-        impl AsyncBytes {
-            fn new(data: Bytes) -> Self {
-                Self { data }
-            }
-        }
-
-        impl MetadataFetch for AsyncBytes {
-            fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
-                async move { Ok(self.data.slice(range.start..range.end)) }.boxed()
-            }
-        }
-
-        /// A `MetadataFetch` implementation that reads from a subset of the full data
-        /// while accepting ranges that address the full data.
-        struct MaskedBytes {
-            inner: Box<dyn MetadataFetch + Send>,
-            inner_range: Range<usize>,
-        }
-
-        impl MaskedBytes {
-            fn new(inner: Box<dyn MetadataFetch + Send>, inner_range: Range<usize>) -> Self {
-                Self { inner, inner_range }
-            }
-        }
-
-        impl MetadataFetch for &mut MaskedBytes {
-            fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, ParquetResult<Bytes>> {
-                let inner_range = self.inner_range.clone();
-                println!("inner_range: {:?}", inner_range);
-                println!("range: {:?}", range);
-                assert!(inner_range.start <= range.start && inner_range.end >= range.end);
-                let range =
-                    range.start - self.inner_range.start..range.end - self.inner_range.start;
-                self.inner.fetch(range)
-            }
-        }
-
-        let metadata_length = data.len();
-        let mut reader = MaskedBytes::new(
-            Box::new(AsyncBytes::new(data)),
-            file_size - metadata_length..file_size,
-        );
-        ParquetMetaDataReader::new()
-            .with_page_indexes(true)
-            .load_and_finish(&mut reader, file_size)
-            .await
-            .unwrap()
-    }
-
-    fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) {
-        assert_eq!(left.column_descr(), right.column_descr());
-        assert_eq!(left.encodings(), right.encodings());
-        assert_eq!(left.num_values(), right.num_values());
-        assert_eq!(left.compressed_size(), right.compressed_size());
-        assert_eq!(left.data_page_offset(), right.data_page_offset());
-        assert_eq!(left.statistics(), right.statistics());
-        assert_eq!(left.offset_index_length(), right.offset_index_length());
-        assert_eq!(left.column_index_length(), right.column_index_length());
-        assert_eq!(
-            left.unencoded_byte_array_data_bytes(),
-            right.unencoded_byte_array_data_bytes()
-        );
-    }
-
-    fn check_row_groups_are_equivalent(left: &RowGroupMetaData, right: &RowGroupMetaData) {
-        assert_eq!(left.num_rows(), right.num_rows());
-        assert_eq!(left.file_offset(), right.file_offset());
-        assert_eq!(left.total_byte_size(), right.total_byte_size());
-        assert_eq!(left.schema_descr(), right.schema_descr());
-        assert_eq!(left.num_columns(), right.num_columns());
-        left.columns()
-            .iter()
-            .zip(right.columns().iter())
-            .for_each(|(lc, rc)| {
-                check_columns_are_equivalent(lc, rc);
-            });
-    }
-
-    #[tokio::test]
-    async fn test_encode_parquet_metadata_with_page_index() {
-        // Create a ParquetMetadata with page index information
-        let metadata = get_test_metadata(true, true);
-        assert!(has_page_index(&metadata.metadata));
-
-        let mut buf = BytesMut::new().writer();
-        {
-            let writer = ParquetMetaDataWriter::new(&mut buf, &metadata.metadata);
-            writer.finish().unwrap();
-        }
-
-        let data = buf.into_inner().freeze();
-
-        let decoded_metadata = load_metadata_from_bytes(data.len(), data).await;
-
-        // Because the page index offsets will differ, compare invariant parts of the metadata
-        assert_eq!(
-            metadata.metadata.file_metadata(),
-            decoded_metadata.file_metadata()
-        );
-        assert_eq!(
-            metadata.metadata.column_index(),
-            decoded_metadata.column_index()
-        );
-        assert_eq!(
-            metadata.metadata.offset_index(),
-            decoded_metadata.offset_index()
-        );
-        assert_eq!(
-            metadata.metadata.num_row_groups(),
-            decoded_metadata.num_row_groups()
-        );
-
-        // check that the mins and maxes are what we expect for each page
-        // also indirectly checking that the pages were written out as we expected them to be laid out
-        // (if they're not, or something gets refactored in the future that breaks that assumption,
-        // this test may have to drop down to a lower level and create metadata directly instead of relying on
-        // writing an entire file)
-        let column_indexes = metadata.metadata.column_index().unwrap();
-        assert_eq!(column_indexes.len(), 6);
-        // make sure each row group has 2 pages by checking the first column
-        // page counts for each column for each row group, should all be the same and there should be
-        // 12 pages in total across 6 row groups / 1 column
-        let mut page_counts = vec![];
-        for row_group in column_indexes {
-            for column in row_group {
-                match column {
-                    Index::INT32(column_index) => {
-                        page_counts.push(column_index.indexes.len());
-                    }
-                    _ => panic!("unexpected column index type"),
-                }
-            }
-        }
-        assert_eq!(page_counts, vec![2; 6]);
-
-        metadata
-            .metadata
-            .row_groups()
-            .iter()
-            .zip(decoded_metadata.row_groups().iter())
-            .for_each(|(left, right)| {
-                check_row_groups_are_equivalent(left, right);
-            });
-    }
-}