blob: f5105dba2d451fbad232409fc74f41b8672df226 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Contains file writer API, and provides methods to write row groups and columns by
//! using row group writers and column writers respectively.
use std::{
io::{Seek, SeekFrom, Write},
rc::Rc,
};
use byteorder::{ByteOrder, LittleEndian};
use parquet_format as parquet;
use thrift::protocol::{TCompactOutputProtocol, TOutputProtocol};
use crate::basic::PageType;
use crate::column::{
page::{CompressedPage, Page, PageWriteSpec, PageWriter},
writer::{get_column_writer, ColumnWriter},
};
use crate::errors::{ParquetError, Result};
use crate::file::{
metadata::*, properties::WriterPropertiesPtr, reader::TryClone,
statistics::to_thrift as statistics_to_thrift, FOOTER_SIZE, PARQUET_MAGIC,
};
use crate::schema::types::{self, SchemaDescPtr, SchemaDescriptor, TypePtr};
use crate::util::io::{FileSink, Position};
// ----------------------------------------------------------------------
// APIs for file & row group writers
/// Parquet file writer API.
/// Provides methods to write row groups sequentially.
///
/// The main workflow should be as following:
/// - Create file writer, this will open a new file and potentially write some metadata.
/// - Request a new row group writer by calling `next_row_group`.
/// - Once finished writing row group, close row group writer by passing it into
/// `close_row_group` method - this will finalise row group metadata and update metrics.
/// - Write subsequent row groups, if necessary.
/// - After all row groups have been written, close the file writer using `close` method.
pub trait FileWriter {
/// Creates new row group from this file writer.
/// In case of IO error or Thrift error, returns `Err`.
///
/// There is no limit on a number of row groups in a file; however, row groups have
/// to be written sequentially. Every time the next row group is requested, the
/// previous row group must be finalised and closed using `close_row_group` method.
fn next_row_group(&mut self) -> Result<Box<RowGroupWriter>>;
/// Finalises and closes row group that was created using `next_row_group` method.
/// After calling this method, the next row group is available for writes.
fn close_row_group(&mut self, row_group_writer: Box<RowGroupWriter>) -> Result<()>;
/// Closes and finalises file writer.
///
/// All row groups must be appended before this method is called.
/// No writes are allowed after this point.
///
/// Can be called multiple times. It is up to implementation to either result in
/// no-op, or return an `Err` for subsequent calls.
fn close(&mut self) -> Result<()>;
}
/// Parquet row group writer API.
/// Provides methods to access column writers in an iterator-like fashion, order is
/// guaranteed to match the order of schema leaves (column descriptors).
///
/// All columns should be written sequentially; the main workflow is:
/// - Request the next column using `next_column` method - this will return `None` if no
/// more columns are available to write.
/// - Once done writing a column, close column writer with `close_column` method - this
/// will finalise column chunk metadata and update row group metrics.
/// - Once all columns have been written, close row group writer with `close` method -
/// it will return row group metadata and is no-op on already closed row group.
pub trait RowGroupWriter {
/// Returns the next column writer, if available; otherwise returns `None`.
/// In case of any IO error or Thrift error, or if row group writer has already been
/// closed returns `Err`.
///
/// To request the next column writer, the previous one must be finalised and closed
/// using `close_column`.
fn next_column(&mut self) -> Result<Option<ColumnWriter>>;
/// Closes column writer that was created using `next_column` method.
/// This should be called before requesting the next column writer.
fn close_column(&mut self, column_writer: ColumnWriter) -> Result<()>;
/// Closes this row group writer and returns row group metadata.
/// After calling this method row group writer must not be used.
///
/// It is recommended to call this method before requesting another row group, but it
/// will be closed automatically before returning a new row group.
///
/// Can be called multiple times. In subsequent calls will result in no-op and return
/// already created row group metadata.
fn close(&mut self) -> Result<RowGroupMetaDataPtr>;
}
// ----------------------------------------------------------------------
// Serialized impl for file & row group writers
pub trait ParquetWriter: Write + Seek + TryClone {}
impl<T: Write + Seek + TryClone> ParquetWriter for T {}
/// A serialized implementation for Parquet [`FileWriter`].
/// See documentation on file writer for more information.
pub struct SerializedFileWriter<W: ParquetWriter> {
buf: W,
schema: TypePtr,
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
total_num_rows: i64,
row_groups: Vec<RowGroupMetaDataPtr>,
previous_writer_closed: bool,
is_closed: bool,
}
impl<W: ParquetWriter> SerializedFileWriter<W> {
/// Creates new file writer.
pub fn new(
mut buf: W,
schema: TypePtr,
properties: WriterPropertiesPtr,
) -> Result<Self> {
Self::start_file(&mut buf)?;
Ok(Self {
buf,
schema: schema.clone(),
descr: Rc::new(SchemaDescriptor::new(schema)),
props: properties,
total_num_rows: 0,
row_groups: Vec::new(),
previous_writer_closed: true,
is_closed: false,
})
}
/// Writes magic bytes at the beginning of the file.
fn start_file(buf: &mut W) -> Result<()> {
buf.write(&PARQUET_MAGIC)?;
Ok(())
}
/// Finalises active row group writer, otherwise no-op.
fn finalise_row_group_writer(
&mut self,
mut row_group_writer: Box<RowGroupWriter>,
) -> Result<()> {
let row_group_metadata = row_group_writer.close()?;
self.total_num_rows += row_group_metadata.num_rows();
self.row_groups.push(row_group_metadata);
Ok(())
}
/// Assembles and writes metadata at the end of the file.
fn write_metadata(&mut self) -> Result<()> {
let file_metadata = parquet::FileMetaData {
version: self.props.writer_version().as_num(),
schema: types::to_thrift(self.schema.as_ref())?,
num_rows: self.total_num_rows as i64,
row_groups: self
.row_groups
.as_slice()
.into_iter()
.map(|v| v.to_thrift())
.collect(),
key_value_metadata: None,
created_by: Some(self.props.created_by().to_owned()),
column_orders: None,
};
// Write file metadata
let start_pos = self.buf.seek(SeekFrom::Current(0))?;
{
let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
file_metadata.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
}
let end_pos = self.buf.seek(SeekFrom::Current(0))?;
// Write footer
let mut footer_buffer: [u8; FOOTER_SIZE] = [0; FOOTER_SIZE];
let metadata_len = (end_pos - start_pos) as i32;
LittleEndian::write_i32(&mut footer_buffer, metadata_len);
(&mut footer_buffer[4..]).write(&PARQUET_MAGIC)?;
self.buf.write(&footer_buffer)?;
Ok(())
}
#[inline]
fn assert_closed(&self) -> Result<()> {
if self.is_closed {
Err(general_err!("File writer is closed"))
} else {
Ok(())
}
}
#[inline]
fn assert_previous_writer_closed(&self) -> Result<()> {
if !self.previous_writer_closed {
Err(general_err!("Previous row group writer was not closed"))
} else {
Ok(())
}
}
}
impl<W: 'static + ParquetWriter> FileWriter for SerializedFileWriter<W> {
#[inline]
fn next_row_group(&mut self) -> Result<Box<RowGroupWriter>> {
self.assert_closed()?;
self.assert_previous_writer_closed()?;
let row_group_writer = SerializedRowGroupWriter::new(
self.descr.clone(),
self.props.clone(),
&self.buf,
);
self.previous_writer_closed = false;
Ok(Box::new(row_group_writer))
}
#[inline]
fn close_row_group(&mut self, row_group_writer: Box<RowGroupWriter>) -> Result<()> {
self.assert_closed()?;
let res = self.finalise_row_group_writer(row_group_writer);
self.previous_writer_closed = res.is_ok();
res
}
#[inline]
fn close(&mut self) -> Result<()> {
self.assert_closed()?;
self.assert_previous_writer_closed()?;
self.write_metadata()?;
self.is_closed = true;
Ok(())
}
}
/// A serialized implementation for Parquet [`RowGroupWriter`].
/// Coordinates writing of a row group with column writers.
/// See documentation on row group writer for more information.
pub struct SerializedRowGroupWriter<W: ParquetWriter> {
descr: SchemaDescPtr,
props: WriterPropertiesPtr,
buf: W,
total_rows_written: Option<u64>,
total_bytes_written: u64,
column_index: usize,
previous_writer_closed: bool,
row_group_metadata: Option<RowGroupMetaDataPtr>,
column_chunks: Vec<ColumnChunkMetaData>,
}
impl<W: 'static + ParquetWriter> SerializedRowGroupWriter<W> {
pub fn new(
schema_descr: SchemaDescPtr,
properties: WriterPropertiesPtr,
buf: &W,
) -> Self {
let num_columns = schema_descr.num_columns();
Self {
descr: schema_descr,
props: properties,
buf: buf.try_clone().unwrap(),
total_rows_written: None,
total_bytes_written: 0,
column_index: 0,
previous_writer_closed: true,
row_group_metadata: None,
column_chunks: Vec::with_capacity(num_columns),
}
}
/// Checks and finalises current column writer.
fn finalise_column_writer(&mut self, writer: ColumnWriter) -> Result<()> {
let (bytes_written, rows_written, metadata) = match writer {
ColumnWriter::BoolColumnWriter(typed) => typed.close()?,
ColumnWriter::Int32ColumnWriter(typed) => typed.close()?,
ColumnWriter::Int64ColumnWriter(typed) => typed.close()?,
ColumnWriter::Int96ColumnWriter(typed) => typed.close()?,
ColumnWriter::FloatColumnWriter(typed) => typed.close()?,
ColumnWriter::DoubleColumnWriter(typed) => typed.close()?,
ColumnWriter::ByteArrayColumnWriter(typed) => typed.close()?,
ColumnWriter::FixedLenByteArrayColumnWriter(typed) => typed.close()?,
};
// Update row group writer metrics
self.total_bytes_written += bytes_written;
self.column_chunks.push(metadata);
if let Some(rows) = self.total_rows_written {
if rows != rows_written {
return Err(general_err!(
"Incorrect number of rows, expected {} != {} rows",
rows,
rows_written
));
}
} else {
self.total_rows_written = Some(rows_written);
}
Ok(())
}
#[inline]
fn assert_closed(&self) -> Result<()> {
if self.row_group_metadata.is_some() {
Err(general_err!("Row group writer is closed"))
} else {
Ok(())
}
}
#[inline]
fn assert_previous_writer_closed(&self) -> Result<()> {
if !self.previous_writer_closed {
Err(general_err!("Previous column writer was not closed"))
} else {
Ok(())
}
}
}
impl<W: 'static + ParquetWriter> RowGroupWriter for SerializedRowGroupWriter<W> {
#[inline]
fn next_column(&mut self) -> Result<Option<ColumnWriter>> {
self.assert_closed()?;
self.assert_previous_writer_closed()?;
if self.column_index >= self.descr.num_columns() {
return Ok(None);
}
let sink = FileSink::new(&self.buf);
let page_writer = Box::new(SerializedPageWriter::new(sink));
let column_writer = get_column_writer(
self.descr.column(self.column_index),
self.props.clone(),
page_writer,
);
self.column_index += 1;
self.previous_writer_closed = false;
Ok(Some(column_writer))
}
#[inline]
fn close_column(&mut self, column_writer: ColumnWriter) -> Result<()> {
let res = self.finalise_column_writer(column_writer);
self.previous_writer_closed = res.is_ok();
res
}
#[inline]
fn close(&mut self) -> Result<RowGroupMetaDataPtr> {
if self.row_group_metadata.is_none() {
self.assert_previous_writer_closed()?;
let column_chunks = std::mem::replace(&mut self.column_chunks, vec![]);
let row_group_metadata = RowGroupMetaData::builder(self.descr.clone())
.set_column_metadata(column_chunks)
.set_total_byte_size(self.total_bytes_written as i64)
.set_num_rows(self.total_rows_written.unwrap_or(0) as i64)
.build()?;
self.row_group_metadata = Some(Rc::new(row_group_metadata));
}
let metadata = self.row_group_metadata.as_ref().unwrap().clone();
Ok(metadata)
}
}
/// A serialized implementation for Parquet [`PageWriter`].
/// Writes and serializes pages and metadata into output stream.
///
/// `SerializedPageWriter` should not be used after calling `close()`.
pub struct SerializedPageWriter<T: Write + Position> {
sink: T,
}
impl<T: Write + Position> SerializedPageWriter<T> {
/// Creates new page writer.
pub fn new(sink: T) -> Self {
Self { sink }
}
/// Serializes page header into Thrift.
/// Returns number of bytes that have been written into the sink.
#[inline]
fn serialize_page_header(&mut self, header: parquet::PageHeader) -> Result<usize> {
let start_pos = self.sink.pos();
{
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
header.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
}
Ok((self.sink.pos() - start_pos) as usize)
}
/// Serializes column chunk into Thrift.
/// Returns Ok() if there are not errors serializing and writing data into the sink.
#[inline]
fn serialize_column_chunk(&mut self, chunk: parquet::ColumnChunk) -> Result<()> {
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
chunk.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
Ok(())
}
}
impl<T: Write + Position> PageWriter for SerializedPageWriter<T> {
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec> {
let uncompressed_size = page.uncompressed_size();
let compressed_size = page.compressed_size();
let num_values = page.num_values();
let encoding = page.encoding();
let page_type = page.page_type();
let mut page_header = parquet::PageHeader {
type_: page_type.into(),
uncompressed_page_size: uncompressed_size as i32,
compressed_page_size: compressed_size as i32,
// TODO: Add support for crc checksum
crc: None,
data_page_header: None,
index_page_header: None,
dictionary_page_header: None,
data_page_header_v2: None,
};
match page.compressed_page() {
&Page::DataPage {
def_level_encoding,
rep_level_encoding,
ref statistics,
..
} => {
let data_page_header = parquet::DataPageHeader {
num_values: num_values as i32,
encoding: encoding.into(),
definition_level_encoding: def_level_encoding.into(),
repetition_level_encoding: rep_level_encoding.into(),
statistics: statistics_to_thrift(statistics.as_ref()),
};
page_header.data_page_header = Some(data_page_header);
}
&Page::DataPageV2 {
num_nulls,
num_rows,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed,
ref statistics,
..
} => {
let data_page_header_v2 = parquet::DataPageHeaderV2 {
num_values: num_values as i32,
num_nulls: num_nulls as i32,
num_rows: num_rows as i32,
encoding: encoding.into(),
definition_levels_byte_length: def_levels_byte_len as i32,
repetition_levels_byte_length: rep_levels_byte_len as i32,
is_compressed: Some(is_compressed),
statistics: statistics_to_thrift(statistics.as_ref()),
};
page_header.data_page_header_v2 = Some(data_page_header_v2);
}
&Page::DictionaryPage { is_sorted, .. } => {
let dictionary_page_header = parquet::DictionaryPageHeader {
num_values: num_values as i32,
encoding: encoding.into(),
is_sorted: Some(is_sorted),
};
page_header.dictionary_page_header = Some(dictionary_page_header);
}
}
let start_pos = self.sink.pos();
let header_size = self.serialize_page_header(page_header)?;
self.sink.write_all(page.data())?;
let mut spec = PageWriteSpec::new();
spec.page_type = page_type;
spec.uncompressed_size = uncompressed_size + header_size;
spec.compressed_size = compressed_size + header_size;
spec.offset = start_pos;
spec.bytes_written = self.sink.pos() - start_pos;
// Number of values is incremented for data pages only
if page_type == PageType::DATA_PAGE || page_type == PageType::DATA_PAGE_V2 {
spec.num_values = num_values;
}
Ok(spec)
}
fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
self.serialize_column_chunk(metadata.to_thrift())
}
fn close(&mut self) -> Result<()> {
self.sink.flush()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::{fs::File, io::Cursor};
use crate::basic::{Compression, Encoding, Repetition, Type};
use crate::column::page::PageReader;
use crate::compression::{create_codec, Codec};
use crate::file::{
properties::WriterProperties,
reader::{FileReader, SerializedFileReader, SerializedPageReader},
statistics::{from_thrift, to_thrift, Statistics},
};
use crate::record::RowAccessor;
use crate::util::{memory::ByteBufferPtr, test_common::get_temp_file};
#[test]
fn test_file_writer_error_after_close() {
let file = get_temp_file("test_file_writer_error_after_close", &[]);
let schema = Rc::new(types::Type::group_type_builder("schema").build().unwrap());
let props = Rc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
writer.close().unwrap();
{
let res = writer.next_row_group();
assert!(res.is_err());
if let Err(err) = res {
assert_eq!(format!("{}", err), "Parquet error: File writer is closed");
}
}
{
let res = writer.close();
assert!(res.is_err());
if let Err(err) = res {
assert_eq!(format!("{}", err), "Parquet error: File writer is closed");
}
}
}
#[test]
fn test_row_group_writer_error_after_close() {
let file = get_temp_file("test_file_writer_row_group_error_after_close", &[]);
let schema = Rc::new(types::Type::group_type_builder("schema").build().unwrap());
let props = Rc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
let mut row_group_writer = writer.next_row_group().unwrap();
row_group_writer.close().unwrap();
let res = row_group_writer.next_column();
assert!(res.is_err());
if let Err(err) = res {
assert_eq!(
format!("{}", err),
"Parquet error: Row group writer is closed"
);
}
}
#[test]
fn test_row_group_writer_error_not_all_columns_written() {
let file =
get_temp_file("test_row_group_writer_error_not_all_columns_written", &[]);
let schema = Rc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![Rc::new(
types::Type::primitive_type_builder("col1", Type::INT32)
.build()
.unwrap(),
)])
.build()
.unwrap(),
);
let props = Rc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
let mut row_group_writer = writer.next_row_group().unwrap();
let res = row_group_writer.close();
assert!(res.is_err());
if let Err(err) = res {
assert_eq!(
format!("{}", err),
"Parquet error: Column length mismatch: 1 != 0"
);
}
}
#[test]
fn test_row_group_writer_num_records_mismatch() {
let file = get_temp_file("test_row_group_writer_num_records_mismatch", &[]);
let schema = Rc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![
Rc::new(
types::Type::primitive_type_builder("col1", Type::INT32)
.with_repetition(Repetition::REQUIRED)
.build()
.unwrap(),
),
Rc::new(
types::Type::primitive_type_builder("col2", Type::INT32)
.with_repetition(Repetition::REQUIRED)
.build()
.unwrap(),
),
])
.build()
.unwrap(),
);
let props = Rc::new(WriterProperties::builder().build());
let mut writer = SerializedFileWriter::new(file, schema, props).unwrap();
let mut row_group_writer = writer.next_row_group().unwrap();
let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
if let ColumnWriter::Int32ColumnWriter(ref mut typed) = col_writer {
typed.write_batch(&[1, 2, 3], None, None).unwrap();
}
row_group_writer.close_column(col_writer).unwrap();
let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
if let ColumnWriter::Int32ColumnWriter(ref mut typed) = col_writer {
typed.write_batch(&[1, 2], None, None).unwrap();
}
let res = row_group_writer.close_column(col_writer);
assert!(res.is_err());
if let Err(err) = res {
assert_eq!(
format!("{}", err),
"Parquet error: Incorrect number of rows, expected 3 != 2 rows"
);
}
}
#[test]
fn test_file_writer_empty_file() {
let file = get_temp_file("test_file_writer_write_empty_file", &[]);
let schema = Rc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![Rc::new(
types::Type::primitive_type_builder("col1", Type::INT32)
.build()
.unwrap(),
)])
.build()
.unwrap(),
);
let props = Rc::new(WriterProperties::builder().build());
let mut writer =
SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
writer.close().unwrap();
let reader = SerializedFileReader::new(file).unwrap();
assert_eq!(reader.get_row_iter(None).unwrap().count(), 0);
}
#[test]
fn test_file_writer_empty_row_groups() {
let file = get_temp_file("test_file_writer_write_empty_row_groups", &[]);
test_file_roundtrip(file, vec![]);
}
#[test]
fn test_file_writer_single_row_group() {
let file = get_temp_file("test_file_writer_write_single_row_group", &[]);
test_file_roundtrip(file, vec![vec![1, 2, 3, 4, 5]]);
}
#[test]
fn test_file_writer_multiple_row_groups() {
let file = get_temp_file("test_file_writer_write_multiple_row_groups", &[]);
test_file_roundtrip(
file,
vec![
vec![1, 2, 3, 4, 5],
vec![1, 2, 3],
vec![1],
vec![1, 2, 3, 4, 5, 6],
],
);
}
#[test]
fn test_file_writer_multiple_large_row_groups() {
let file = get_temp_file("test_file_writer_multiple_large_row_groups", &[]);
test_file_roundtrip(
file,
vec![vec![123; 1024], vec![124; 1000], vec![125; 15], vec![]],
);
}
#[test]
fn test_page_writer_data_pages() {
let pages = vec![
Page::DataPage {
buf: ByteBufferPtr::new(vec![1, 2, 3, 4, 5, 6, 7, 8]),
num_values: 10,
encoding: Encoding::DELTA_BINARY_PACKED,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true)),
},
Page::DataPageV2 {
buf: ByteBufferPtr::new(vec![4; 128]),
num_values: 10,
encoding: Encoding::DELTA_BINARY_PACKED,
num_nulls: 2,
num_rows: 12,
def_levels_byte_len: 24,
rep_levels_byte_len: 32,
is_compressed: false,
statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true)),
},
];
test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
}
#[test]
fn test_page_writer_dict_pages() {
let pages = vec![
Page::DictionaryPage {
buf: ByteBufferPtr::new(vec![1, 2, 3, 4, 5]),
num_values: 5,
encoding: Encoding::RLE_DICTIONARY,
is_sorted: false,
},
Page::DataPage {
buf: ByteBufferPtr::new(vec![1, 2, 3, 4, 5, 6, 7, 8]),
num_values: 10,
encoding: Encoding::DELTA_BINARY_PACKED,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: Some(Statistics::int32(Some(1), Some(3), None, 7, true)),
},
Page::DataPageV2 {
buf: ByteBufferPtr::new(vec![4; 128]),
num_values: 10,
encoding: Encoding::DELTA_BINARY_PACKED,
num_nulls: 2,
num_rows: 12,
def_levels_byte_len: 24,
rep_levels_byte_len: 32,
is_compressed: false,
statistics: None,
},
];
test_page_roundtrip(&pages[..], Compression::SNAPPY, Type::INT32);
test_page_roundtrip(&pages[..], Compression::UNCOMPRESSED, Type::INT32);
}
/// Tests writing and reading pages.
/// Physical type is for statistics only, should match any defined statistics type in
/// pages.
fn test_page_roundtrip(pages: &[Page], codec: Compression, physical_type: Type) {
let mut compressed_pages = vec![];
let mut total_num_values = 0i64;
let mut compressor = create_codec(codec).unwrap();
for page in pages {
let uncompressed_len = page.buffer().len();
let compressed_page = match page {
&Page::DataPage {
ref buf,
num_values,
encoding,
def_level_encoding,
rep_level_encoding,
ref statistics,
} => {
total_num_values += num_values as i64;
let output_buf = compress_helper(compressor.as_mut(), buf.data());
Page::DataPage {
buf: ByteBufferPtr::new(output_buf),
num_values,
encoding,
def_level_encoding,
rep_level_encoding,
statistics: from_thrift(
physical_type,
to_thrift(statistics.as_ref()),
),
}
}
&Page::DataPageV2 {
ref buf,
num_values,
encoding,
num_nulls,
num_rows,
def_levels_byte_len,
rep_levels_byte_len,
ref statistics,
..
} => {
total_num_values += num_values as i64;
let offset = (def_levels_byte_len + rep_levels_byte_len) as usize;
let cmp_buf =
compress_helper(compressor.as_mut(), &buf.data()[offset..]);
let mut output_buf = Vec::from(&buf.data()[..offset]);
output_buf.extend_from_slice(&cmp_buf[..]);
Page::DataPageV2 {
buf: ByteBufferPtr::new(output_buf),
num_values,
encoding,
num_nulls,
num_rows,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed: compressor.is_some(),
statistics: from_thrift(
physical_type,
to_thrift(statistics.as_ref()),
),
}
}
&Page::DictionaryPage {
ref buf,
num_values,
encoding,
is_sorted,
} => {
let output_buf = compress_helper(compressor.as_mut(), buf.data());
Page::DictionaryPage {
buf: ByteBufferPtr::new(output_buf),
num_values,
encoding,
is_sorted,
}
}
};
let compressed_page = CompressedPage::new(compressed_page, uncompressed_len);
compressed_pages.push(compressed_page);
}
let mut buffer: Vec<u8> = vec![];
let mut result_pages: Vec<Page> = vec![];
{
let cursor = Cursor::new(&mut buffer);
let mut page_writer = SerializedPageWriter::new(cursor);
for page in compressed_pages {
page_writer.write_page(page).unwrap();
}
page_writer.close().unwrap();
}
{
let mut page_reader = SerializedPageReader::new(
Cursor::new(&buffer),
total_num_values,
codec,
physical_type,
)
.unwrap();
while let Some(page) = page_reader.get_next_page().unwrap() {
result_pages.push(page);
}
}
assert_eq!(result_pages.len(), pages.len());
for i in 0..result_pages.len() {
assert_page(&result_pages[i], &pages[i]);
}
}
/// Helper function to compress a slice
fn compress_helper(compressor: Option<&mut Box<Codec>>, data: &[u8]) -> Vec<u8> {
let mut output_buf = vec![];
if let Some(cmpr) = compressor {
cmpr.compress(data, &mut output_buf).unwrap();
} else {
output_buf.extend_from_slice(data);
}
output_buf
}
/// Check if pages match.
fn assert_page(left: &Page, right: &Page) {
assert_eq!(left.page_type(), right.page_type());
assert_eq!(left.buffer().data(), right.buffer().data());
assert_eq!(left.num_values(), right.num_values());
assert_eq!(left.encoding(), right.encoding());
assert_eq!(to_thrift(left.statistics()), to_thrift(right.statistics()));
}
/// File write-read roundtrip.
/// `data` consists of arrays of values for each row group.
fn test_file_roundtrip(file: File, data: Vec<Vec<i32>>) {
let schema = Rc::new(
types::Type::group_type_builder("schema")
.with_fields(&mut vec![Rc::new(
types::Type::primitive_type_builder("col1", Type::INT32)
.with_repetition(Repetition::REQUIRED)
.build()
.unwrap(),
)])
.build()
.unwrap(),
);
let props = Rc::new(WriterProperties::builder().build());
let mut file_writer =
SerializedFileWriter::new(file.try_clone().unwrap(), schema, props).unwrap();
let mut rows: i64 = 0;
for subset in &data {
let mut row_group_writer = file_writer.next_row_group().unwrap();
let col_writer = row_group_writer.next_column().unwrap();
if let Some(mut writer) = col_writer {
match writer {
ColumnWriter::Int32ColumnWriter(ref mut typed) => {
rows +=
typed.write_batch(&subset[..], None, None).unwrap() as i64;
}
_ => {
unimplemented!();
}
}
row_group_writer.close_column(writer).unwrap();
}
file_writer.close_row_group(row_group_writer).unwrap();
}
file_writer.close().unwrap();
let reader = SerializedFileReader::new(file).unwrap();
assert_eq!(reader.num_row_groups(), data.len());
assert_eq!(
reader.metadata().file_metadata().num_rows(),
rows,
"row count in metadata not equal to number of rows written"
);
for i in 0..reader.num_row_groups() {
let row_group_reader = reader.get_row_group(i).unwrap();
let iter = row_group_reader.get_row_iter(None).unwrap();
let res = iter
.map(|elem| elem.get_int(0).unwrap())
.collect::<Vec<i32>>();
assert_eq!(res, data[i]);
}
}
}