blob: cd124031cfdcdaad7814fc3d371f5496c5157302 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Tests that the ArrowWriter correctly lays out values into multiple pages
use arrow::array::{Int32Array, StringArray};
use arrow::record_batch::RecordBatch;
use arrow_array::builder::{Int32Builder, ListBuilder};
use bytes::Bytes;
use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder};
use parquet::arrow::ArrowWriter;
use parquet::basic::{Encoding, PageType};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::{ReaderProperties, WriterProperties};
use parquet::file::reader::SerializedPageReader;
use std::sync::Arc;
struct Layout {
row_groups: Vec<RowGroup>,
}
struct RowGroup {
columns: Vec<ColumnChunk>,
}
struct ColumnChunk {
pages: Vec<Page>,
dictionary_page: Option<Page>,
}
struct Page {
rows: usize,
compressed_size: usize,
page_header_size: usize,
encoding: Encoding,
page_type: PageType,
}
struct LayoutTest {
props: WriterProperties,
batches: Vec<RecordBatch>,
layout: Layout,
}
fn do_test(test: LayoutTest) {
let mut buf = Vec::with_capacity(1024);
let mut writer =
ArrowWriter::try_new(&mut buf, test.batches[0].schema(), Some(test.props)).unwrap();
for batch in test.batches {
writer.write(&batch).unwrap();
}
writer.close().unwrap();
let b = Bytes::from(buf);
// Re-read file to decode column index
let read_options = ArrowReaderOptions::new().with_page_index(true);
let reader =
ParquetRecordBatchReaderBuilder::try_new_with_options(b.clone(), read_options).unwrap();
assert_layout(&b, reader.metadata().as_ref(), &test.layout);
}
fn assert_layout(file_reader: &Bytes, meta: &ParquetMetaData, layout: &Layout) {
assert_eq!(meta.row_groups().len(), layout.row_groups.len());
let iter = meta
.row_groups()
.iter()
.zip(&layout.row_groups)
.zip(meta.offset_index().unwrap());
for ((row_group, row_group_layout), offset_index) in iter {
// Check against offset index
assert_eq!(offset_index.len(), row_group_layout.columns.len());
for (column_index, column_layout) in offset_index.iter().zip(&row_group_layout.columns) {
assert_eq!(
column_index.len(),
column_layout.pages.len(),
"index page count mismatch"
);
for (idx, (page, page_layout)) in
column_index.iter().zip(&column_layout.pages).enumerate()
{
assert_eq!(
page.compressed_page_size as usize,
page_layout.compressed_size + page_layout.page_header_size,
"index page {idx} size mismatch"
);
let next_first_row_index = column_index
.get(idx + 1)
.map(|x| x.first_row_index)
.unwrap_or_else(|| row_group.num_rows());
let num_rows = next_first_row_index - page.first_row_index;
assert_eq!(
num_rows as usize, page_layout.rows,
"index page {idx} row count"
);
}
}
// Check against page data
assert_eq!(
row_group.columns().len(),
row_group_layout.columns.len(),
"column count mismatch"
);
let iter = row_group
.columns()
.iter()
.zip(&row_group_layout.columns)
.enumerate();
for (idx, (column, column_layout)) in iter {
let properties = ReaderProperties::builder()
.set_backward_compatible_lz4(false)
.build();
let page_reader = SerializedPageReader::new_with_properties(
Arc::new(file_reader.clone()),
column,
row_group.num_rows() as usize,
None,
Arc::new(properties),
)
.unwrap();
let pages = page_reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(
pages.len(),
column_layout.pages.len() + column_layout.dictionary_page.is_some() as usize,
"page {idx} count mismatch"
);
let page_layouts = column_layout
.dictionary_page
.iter()
.chain(&column_layout.pages);
for (page, page_layout) in pages.iter().zip(page_layouts) {
assert_eq!(page.encoding(), page_layout.encoding);
assert_eq!(
page.buffer().len(),
page_layout.compressed_size,
"page {idx} size mismatch"
);
assert_eq!(page.page_type(), page_layout.page_type);
}
}
}
}
#[test]
fn test_primitive() {
let array = Arc::new(Int32Array::from_iter_values(0..2000)) as _;
let batch = RecordBatch::try_from_iter([("col", array)]).unwrap();
let props = WriterProperties::builder()
.set_dictionary_enabled(false)
.set_data_page_size_limit(1000)
.set_write_batch_size(10)
.build();
// Test spill plain encoding pages
do_test(LayoutTest {
props,
batches: vec![batch.clone()],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
pages: (0..8)
.map(|_| Page {
rows: 250,
page_header_size: 36,
compressed_size: 1000,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
})
.collect(),
dictionary_page: None,
}],
}],
},
});
// Test spill dictionary
let props = WriterProperties::builder()
.set_dictionary_enabled(true)
.set_dictionary_page_size_limit(1000)
.set_data_page_size_limit(10000)
.set_write_batch_size(10)
.build();
do_test(LayoutTest {
props,
batches: vec![batch.clone()],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
pages: vec![
Page {
rows: 250,
page_header_size: 36,
compressed_size: 258,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 1750,
page_header_size: 36,
compressed_size: 7000,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
},
],
dictionary_page: Some(Page {
rows: 250,
page_header_size: 36,
compressed_size: 1000,
encoding: Encoding::PLAIN,
page_type: PageType::DICTIONARY_PAGE,
}),
}],
}],
},
});
// Test spill dictionary encoded pages
let props = WriterProperties::builder()
.set_dictionary_enabled(true)
.set_dictionary_page_size_limit(10000)
.set_data_page_size_limit(500)
.set_write_batch_size(10)
.build();
do_test(LayoutTest {
props,
batches: vec![batch.clone()],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
pages: vec![
Page {
rows: 400,
page_header_size: 36,
compressed_size: 452,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 370,
page_header_size: 36,
compressed_size: 472,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 36,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 36,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 36,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 240,
page_header_size: 36,
compressed_size: 332,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
],
dictionary_page: Some(Page {
rows: 2000,
page_header_size: 36,
compressed_size: 8000,
encoding: Encoding::PLAIN,
page_type: PageType::DICTIONARY_PAGE,
}),
}],
}],
},
});
// Test row count limit
let props = WriterProperties::builder()
.set_dictionary_enabled(false)
.set_data_page_row_count_limit(100)
.set_write_batch_size(100)
.build();
do_test(LayoutTest {
props,
batches: vec![batch],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
pages: (0..20)
.map(|_| Page {
rows: 100,
page_header_size: 36,
compressed_size: 400,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
})
.collect(),
dictionary_page: None,
}],
}],
},
});
}
#[test]
fn test_string() {
let array = Arc::new(StringArray::from_iter_values(
(0..2000).map(|x| format!("{x:04}")),
)) as _;
let batch = RecordBatch::try_from_iter([("col", array)]).unwrap();
let props = WriterProperties::builder()
.set_dictionary_enabled(false)
.set_data_page_size_limit(1000)
.set_write_batch_size(10)
.build();
// Test spill plain encoding pages
do_test(LayoutTest {
props,
batches: vec![batch.clone()],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
pages: (0..15)
.map(|_| Page {
rows: 130,
page_header_size: 36,
compressed_size: 1040,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
})
.chain(std::iter::once(Page {
rows: 50,
page_header_size: 35,
compressed_size: 400,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
}))
.collect(),
dictionary_page: None,
}],
}],
},
});
// Test spill dictionary
let props = WriterProperties::builder()
.set_dictionary_enabled(true)
.set_dictionary_page_size_limit(1000)
.set_data_page_size_limit(10000)
.set_write_batch_size(10)
.build();
do_test(LayoutTest {
props,
batches: vec![batch.clone()],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
pages: vec![
Page {
rows: 130,
page_header_size: 36,
compressed_size: 138,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 1250,
page_header_size: 38,
compressed_size: 10000,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 620,
page_header_size: 36,
compressed_size: 4960,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
},
],
dictionary_page: Some(Page {
rows: 130,
page_header_size: 36,
compressed_size: 1040,
encoding: Encoding::PLAIN,
page_type: PageType::DICTIONARY_PAGE,
}),
}],
}],
},
});
// Test spill dictionary encoded pages
let props = WriterProperties::builder()
.set_dictionary_enabled(true)
.set_dictionary_page_size_limit(20000)
.set_data_page_size_limit(500)
.set_write_batch_size(10)
.build();
do_test(LayoutTest {
props,
batches: vec![batch],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
pages: vec![
Page {
rows: 400,
page_header_size: 36,
compressed_size: 452,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 370,
page_header_size: 36,
compressed_size: 472,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 36,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 36,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 330,
page_header_size: 36,
compressed_size: 464,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
Page {
rows: 240,
page_header_size: 36,
compressed_size: 332,
encoding: Encoding::RLE_DICTIONARY,
page_type: PageType::DATA_PAGE,
},
],
dictionary_page: Some(Page {
rows: 2000,
page_header_size: 36,
compressed_size: 16000,
encoding: Encoding::PLAIN,
page_type: PageType::DICTIONARY_PAGE,
}),
}],
}],
},
});
}
#[test]
fn test_list() {
let mut list = ListBuilder::new(Int32Builder::new());
for _ in 0..200 {
let values = list.values();
for i in 0..8 {
values.append_value(i);
}
list.append(true);
}
let array = Arc::new(list.finish()) as _;
let batch = RecordBatch::try_from_iter([("col", array)]).unwrap();
let props = WriterProperties::builder()
.set_dictionary_enabled(false)
.set_data_page_row_count_limit(20)
.set_write_batch_size(3)
.build();
// Test rows not split across pages
do_test(LayoutTest {
props,
batches: vec![batch],
layout: Layout {
row_groups: vec![RowGroup {
columns: vec![ColumnChunk {
pages: (0..10)
.map(|_| Page {
rows: 20,
page_header_size: 36,
compressed_size: 672,
encoding: Encoding::PLAIN,
page_type: PageType::DATA_PAGE,
})
.collect(),
dictionary_page: None,
}],
}],
},
});
}