blob: b75d3b5028bb40f2a4cdda225ac740e03f5f4fc8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! Contains Parquet Page definitions and page reader interface.
use crate::basic::{Encoding, PageType};
use crate::errors::Result;
use crate::file::{metadata::ColumnChunkMetaData, statistics::Statistics};
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
/// Parquet Page definition.
///
/// List of supported pages.
/// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which
/// used to store uncompressed bytes of the page.
#[derive(Clone)]
pub enum Page {
DataPage {
buf: ByteBufferPtr,
num_values: u32,
encoding: Encoding,
def_level_encoding: Encoding,
rep_level_encoding: Encoding,
statistics: Option<Statistics>,
},
DataPageV2 {
buf: ByteBufferPtr,
num_values: u32,
encoding: Encoding,
num_nulls: u32,
num_rows: u32,
def_levels_byte_len: u32,
rep_levels_byte_len: u32,
is_compressed: bool,
statistics: Option<Statistics>,
},
DictionaryPage {
buf: ByteBufferPtr,
num_values: u32,
encoding: Encoding,
is_sorted: bool,
},
}
impl Page {
/// Returns [`PageType`](crate::basic::PageType) for this page.
pub fn page_type(&self) -> PageType {
match self {
Page::DataPage { .. } => PageType::DATA_PAGE,
Page::DataPageV2 { .. } => PageType::DATA_PAGE_V2,
Page::DictionaryPage { .. } => PageType::DICTIONARY_PAGE,
}
}
/// Returns internal byte buffer reference for this page.
pub fn buffer(&self) -> &ByteBufferPtr {
match self {
Page::DataPage { ref buf, .. } => &buf,
Page::DataPageV2 { ref buf, .. } => &buf,
Page::DictionaryPage { ref buf, .. } => &buf,
}
}
/// Returns number of values in this page.
pub fn num_values(&self) -> u32 {
match self {
Page::DataPage { num_values, .. } => *num_values,
Page::DataPageV2 { num_values, .. } => *num_values,
Page::DictionaryPage { num_values, .. } => *num_values,
}
}
/// Returns this page [`Encoding`](crate::basic::Encoding).
pub fn encoding(&self) -> Encoding {
match self {
Page::DataPage { encoding, .. } => *encoding,
Page::DataPageV2 { encoding, .. } => *encoding,
Page::DictionaryPage { encoding, .. } => *encoding,
}
}
/// Returns optional [`Statistics`](crate::file::statistics::Statistics).
pub fn statistics(&self) -> Option<&Statistics> {
match self {
Page::DataPage { ref statistics, .. } => statistics.as_ref(),
Page::DataPageV2 { ref statistics, .. } => statistics.as_ref(),
Page::DictionaryPage { .. } => None,
}
}
}
/// Helper struct to represent pages with potentially compressed buffer (data page v1) or
/// compressed and concatenated buffer (def levels + rep levels + compressed values for
/// data page v2).
///
/// The difference with `Page` is that `Page` buffer is always uncompressed.
pub struct CompressedPage {
compressed_page: Page,
uncompressed_size: usize,
}
impl CompressedPage {
/// Creates `CompressedPage` from a page with potentially compressed buffer and
/// uncompressed size.
pub fn new(compressed_page: Page, uncompressed_size: usize) -> Self {
Self {
compressed_page,
uncompressed_size,
}
}
/// Returns page type.
pub fn page_type(&self) -> PageType {
self.compressed_page.page_type()
}
/// Returns underlying page with potentially compressed buffer.
pub fn compressed_page(&self) -> &Page {
&self.compressed_page
}
/// Returns uncompressed size in bytes.
pub fn uncompressed_size(&self) -> usize {
self.uncompressed_size
}
/// Returns compressed size in bytes.
///
/// Note that it is assumed that buffer is compressed, but it may not be. In this
/// case compressed size will be equal to uncompressed size.
pub fn compressed_size(&self) -> usize {
self.compressed_page.buffer().len()
}
/// Number of values in page.
pub fn num_values(&self) -> u32 {
self.compressed_page.num_values()
}
/// Returns encoding for values in page.
pub fn encoding(&self) -> Encoding {
self.compressed_page.encoding()
}
/// Returns slice of compressed buffer in the page.
pub fn data(&self) -> &[u8] {
self.compressed_page.buffer().data()
}
}
/// Contains page write metrics.
pub struct PageWriteSpec {
pub page_type: PageType,
pub uncompressed_size: usize,
pub compressed_size: usize,
pub num_values: u32,
pub offset: u64,
pub bytes_written: u64,
}
impl PageWriteSpec {
/// Creates new spec with default page write metrics.
pub fn new() -> Self {
Self {
page_type: PageType::DATA_PAGE,
uncompressed_size: 0,
compressed_size: 0,
num_values: 0,
offset: 0,
bytes_written: 0,
}
}
}
/// API for reading pages from a column chunk.
/// This offers a iterator like API to get the next page.
pub trait PageReader: Iterator<Item = Result<Page>> {
/// Gets the next page in the column chunk associated with this reader.
/// Returns `None` if there are no pages left.
fn get_next_page(&mut self) -> Result<Option<Page>>;
}
/// API for writing pages in a column chunk.
///
/// It is reasonable to assume that all pages will be written in the correct order, e.g.
/// dictionary page followed by data pages, or a set of data pages, etc.
pub trait PageWriter {
/// Writes a page into the output stream/sink.
/// Returns `PageWriteSpec` that contains information about written page metrics,
/// including number of bytes, size, number of values, offset, etc.
///
/// This method is called for every compressed page we write into underlying buffer,
/// either data page or dictionary page.
fn write_page(&mut self, page: CompressedPage) -> Result<PageWriteSpec>;
/// Writes column chunk metadata into the output stream/sink.
///
/// This method is called once before page writer is closed, normally when writes are
/// finalised in column writer.
fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()>;
/// Closes resources and flushes underlying sink.
/// Page writer should not be used after this method is called.
fn close(&mut self) -> Result<()>;
}
/// An iterator over pages of some specific column in a parquet file.
pub trait PageIterator: Iterator<Item = Result<Box<dyn PageReader>>> {
/// Get schema of parquet file.
fn schema(&mut self) -> Result<SchemaDescPtr>;
/// Get column schema of this page iterator.
fn column_schema(&mut self) -> Result<ColumnDescPtr>;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_page() {
let data_page = Page::DataPage {
buf: ByteBufferPtr::new(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)),
};
assert_eq!(data_page.page_type(), PageType::DATA_PAGE);
assert_eq!(data_page.buffer().data(), vec![0, 1, 2].as_slice());
assert_eq!(data_page.num_values(), 10);
assert_eq!(data_page.encoding(), Encoding::PLAIN);
assert_eq!(
data_page.statistics(),
Some(&Statistics::int32(Some(1), Some(2), None, 1, true))
);
let data_page_v2 = Page::DataPageV2 {
buf: ByteBufferPtr::new(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
num_nulls: 5,
num_rows: 20,
def_levels_byte_len: 30,
rep_levels_byte_len: 40,
is_compressed: false,
statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)),
};
assert_eq!(data_page_v2.page_type(), PageType::DATA_PAGE_V2);
assert_eq!(data_page_v2.buffer().data(), vec![0, 1, 2].as_slice());
assert_eq!(data_page_v2.num_values(), 10);
assert_eq!(data_page_v2.encoding(), Encoding::PLAIN);
assert_eq!(
data_page_v2.statistics(),
Some(&Statistics::int32(Some(1), Some(2), None, 1, true))
);
let dict_page = Page::DictionaryPage {
buf: ByteBufferPtr::new(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
is_sorted: false,
};
assert_eq!(dict_page.page_type(), PageType::DICTIONARY_PAGE);
assert_eq!(dict_page.buffer().data(), vec![0, 1, 2].as_slice());
assert_eq!(dict_page.num_values(), 10);
assert_eq!(dict_page.encoding(), Encoding::PLAIN);
assert_eq!(dict_page.statistics(), None);
}
#[test]
fn test_compressed_page() {
let data_page = Page::DataPage {
buf: ByteBufferPtr::new(vec![0, 1, 2]),
num_values: 10,
encoding: Encoding::PLAIN,
def_level_encoding: Encoding::RLE,
rep_level_encoding: Encoding::RLE,
statistics: Some(Statistics::int32(Some(1), Some(2), None, 1, true)),
};
let cpage = CompressedPage::new(data_page, 5);
assert_eq!(cpage.page_type(), PageType::DATA_PAGE);
assert_eq!(cpage.uncompressed_size(), 5);
assert_eq!(cpage.compressed_size(), 3);
assert_eq!(cpage.num_values(), 10);
assert_eq!(cpage.encoding(), Encoding::PLAIN);
assert_eq!(cpage.data(), &[0, 1, 2]);
}
}