blob: 4dd7da910fd0025f3170d319eee5e21bec389c39 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use std::cmp::{max, min};
use std::mem::{replace, size_of};
use crate::column::{page::PageReader, reader::ColumnReaderImpl};
use crate::data_type::DataType;
use crate::errors::{ParquetError, Result};
use crate::schema::types::ColumnDescPtr;
use arrow::array::BooleanBufferBuilder;
use arrow::bitmap::Bitmap;
use arrow::buffer::{Buffer, MutableBuffer};
const MIN_BATCH_SIZE: usize = 1024;
/// A `RecordReader` is a stateful column reader that delimits semantic records.
pub struct RecordReader<T: DataType> {
column_desc: ColumnDescPtr,
records: MutableBuffer,
def_levels: Option<MutableBuffer>,
rep_levels: Option<MutableBuffer>,
null_bitmap: Option<BooleanBufferBuilder>,
column_reader: Option<ColumnReaderImpl<T>>,
/// Number of records accumulated in records
num_records: usize,
/// Number of values `num_records` contains.
num_values: usize,
values_seen: usize,
/// Starts from 1, number of values have been written to buffer
values_written: usize,
in_middle_of_record: bool,
}
impl<T: DataType> RecordReader<T> {
pub fn new(column_schema: ColumnDescPtr) -> Self {
let (def_levels, null_map) = if column_schema.max_def_level() > 0 {
(
Some(MutableBuffer::new(MIN_BATCH_SIZE)),
Some(BooleanBufferBuilder::new(0)),
)
} else {
(None, None)
};
let rep_levels = if column_schema.max_rep_level() > 0 {
Some(MutableBuffer::new(MIN_BATCH_SIZE))
} else {
None
};
Self {
records: MutableBuffer::new(MIN_BATCH_SIZE),
def_levels,
rep_levels,
null_bitmap: null_map,
column_reader: None,
column_desc: column_schema,
num_records: 0,
num_values: 0,
values_seen: 0,
values_written: 0,
in_middle_of_record: false,
}
}
/// Set the current page reader.
pub fn set_page_reader(&mut self, page_reader: Box<dyn PageReader>) -> Result<()> {
self.column_reader =
Some(ColumnReaderImpl::new(self.column_desc.clone(), page_reader));
Ok(())
}
/// Try to read `num_records` of column data into internal buffer.
///
/// # Returns
///
/// Number of actual records read.
pub fn read_records(&mut self, num_records: usize) -> Result<usize> {
if self.column_reader.is_none() {
return Ok(0);
}
let mut records_read = 0;
// Used to mark whether we have reached the end of current
// column chunk
let mut end_of_column = false;
loop {
// Try to find some records from buffers that has been read into memory
// but not counted as seen records.
records_read += self.split_records(num_records - records_read)?;
// Since page reader contains complete records, so if we reached end of a
// page reader, we should reach the end of a record
if end_of_column
&& self.values_seen >= self.values_written
&& self.in_middle_of_record
{
self.num_records += 1;
self.num_values = self.values_seen;
self.in_middle_of_record = false;
records_read += 1;
}
if (records_read >= num_records) || end_of_column {
break;
}
let batch_size = max(num_records - records_read, MIN_BATCH_SIZE);
// Try to more value from parquet pages
let values_read = self.read_one_batch(batch_size)?;
if values_read < batch_size {
end_of_column = true;
}
}
Ok(records_read)
}
/// Returns number of records stored in buffer.
pub fn num_records(&self) -> usize {
self.num_records
}
/// Return number of values stored in buffer.
/// If the parquet column is not repeated, it should be equals to `num_records`,
/// otherwise it should be larger than or equal to `num_records`.
pub fn num_values(&self) -> usize {
self.num_values
}
/// Returns definition level data.
/// The implementation has side effects. It will create a new buffer to hold those
/// definition level values that have already been read into memory but not counted
/// as record values, e.g. those from `self.num_values` to `self.values_written`.
pub fn consume_def_levels(&mut self) -> Result<Option<Buffer>> {
let new_buffer = if let Some(ref mut def_levels_buf) = &mut self.def_levels {
let num_left_values = self.values_written - self.num_values;
// create an empty buffer, as it will be resized below
let mut new_buffer = MutableBuffer::new(0);
let num_bytes = num_left_values * size_of::<i16>();
let new_len = self.num_values * size_of::<i16>();
new_buffer.resize(num_bytes, 0);
let new_def_levels = new_buffer.as_slice_mut();
let left_def_levels = &def_levels_buf.as_slice_mut()[new_len..];
new_def_levels[0..num_bytes].copy_from_slice(&left_def_levels[0..num_bytes]);
def_levels_buf.resize(new_len, 0);
Some(new_buffer)
} else {
None
};
Ok(replace(&mut self.def_levels, new_buffer).map(|x| x.into()))
}
/// Return repetition level data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_rep_levels(&mut self) -> Result<Option<Buffer>> {
// TODO: Optimize to reduce the copy
let new_buffer = if let Some(ref mut rep_levels_buf) = &mut self.rep_levels {
let num_left_values = self.values_written - self.num_values;
// create an empty buffer, as it will be resized below
let mut new_buffer = MutableBuffer::new(0);
let num_bytes = num_left_values * size_of::<i16>();
let new_len = self.num_values * size_of::<i16>();
new_buffer.resize(num_bytes, 0);
let new_rep_levels = new_buffer.as_slice_mut();
let left_rep_levels = &rep_levels_buf.as_slice_mut()[new_len..];
new_rep_levels[0..num_bytes].copy_from_slice(&left_rep_levels[0..num_bytes]);
rep_levels_buf.resize(new_len, 0);
Some(new_buffer)
} else {
None
};
Ok(replace(&mut self.rep_levels, new_buffer).map(|x| x.into()))
}
/// Returns currently stored buffer data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_record_data(&mut self) -> Result<Buffer> {
// TODO: Optimize to reduce the copy
let num_left_values = self.values_written - self.num_values;
// create an empty buffer, as it will be resized below
let mut new_buffer = MutableBuffer::new(0);
let num_bytes = num_left_values * T::get_type_size();
let new_len = self.num_values * T::get_type_size();
new_buffer.resize(num_bytes, 0);
let new_records = new_buffer.as_slice_mut();
let left_records = &mut self.records.as_slice_mut()[new_len..];
new_records[0..num_bytes].copy_from_slice(&left_records[0..num_bytes]);
self.records.resize(new_len, 0);
Ok(replace(&mut self.records, new_buffer).into())
}
/// Returns currently stored null bitmap data.
/// The side effect is similar to `consume_def_levels`.
pub fn consume_bitmap_buffer(&mut self) -> Result<Option<Buffer>> {
// TODO: Optimize to reduce the copy
if self.column_desc.max_def_level() > 0 {
assert!(self.null_bitmap.is_some());
let num_left_values = self.values_written - self.num_values;
let new_bitmap_builder = Some(BooleanBufferBuilder::new(max(
MIN_BATCH_SIZE,
num_left_values,
)));
let old_bitmap = replace(&mut self.null_bitmap, new_bitmap_builder)
.map(|mut builder| builder.finish())
.unwrap();
let old_bitmap = Bitmap::from(old_bitmap);
for i in self.num_values..self.values_written {
self.null_bitmap
.as_mut()
.unwrap()
.append(old_bitmap.is_set(i));
}
Ok(Some(old_bitmap.into_buffer()))
} else {
Ok(None)
}
}
/// Reset state of record reader.
/// Should be called after consuming data, e.g. `consume_rep_levels`,
/// `consume_rep_levels`, `consume_record_data` and `consume_bitmap_buffer`.
pub fn reset(&mut self) {
self.values_written -= self.num_values;
self.num_records = 0;
self.num_values = 0;
self.values_seen = 0;
self.in_middle_of_record = false;
}
/// Returns bitmap data.
pub fn consume_bitmap(&mut self) -> Result<Option<Bitmap>> {
self.consume_bitmap_buffer()
.map(|buffer| buffer.map(Bitmap::from))
}
/// Try to read one batch of data.
fn read_one_batch(&mut self, batch_size: usize) -> Result<usize> {
// Reserve spaces
self.records
.resize(self.records.len() + batch_size * T::get_type_size(), 0);
if let Some(ref mut buf) = self.rep_levels {
buf.resize(buf.len() + batch_size * size_of::<i16>(), 0);
}
if let Some(ref mut buf) = self.def_levels {
buf.resize(buf.len() + batch_size * size_of::<i16>(), 0);
}
let values_written = self.values_written;
// Convert mutable buffer spaces to mutable slices
let (prefix, values, suffix) =
unsafe { self.records.as_slice_mut().align_to_mut::<T::T>() };
assert!(prefix.is_empty() && suffix.is_empty());
let values = &mut values[values_written..];
let def_levels = self.def_levels.as_mut().map(|buf| {
let (prefix, def_levels, suffix) =
unsafe { buf.as_slice_mut().align_to_mut::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
&mut def_levels[values_written..]
});
let rep_levels = self.rep_levels.as_mut().map(|buf| {
let (prefix, rep_levels, suffix) =
unsafe { buf.as_slice_mut().align_to_mut::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
&mut rep_levels[values_written..]
});
let (values_read, levels_read) = self
.column_reader
.as_mut()
.unwrap()
.read_batch(batch_size, def_levels, rep_levels, values)?;
// get new references for the def levels.
let def_levels = self.def_levels.as_ref().map(|buf| {
let (prefix, def_levels, suffix) =
unsafe { buf.as_slice().align_to::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
&def_levels[values_written..]
});
let max_def_level = self.column_desc.max_def_level();
if values_read < levels_read {
let def_levels = def_levels.ok_or_else(|| {
general_err!(
"Definition levels should exist when data is less than levels!"
)
})?;
// Fill spaces in column data with default values
let mut values_pos = values_read;
let mut level_pos = levels_read;
while level_pos > values_pos {
if def_levels[level_pos - 1] == max_def_level {
// This values is not empty
// We use swap rather than assign here because T::T doesn't
// implement Copy
values.swap(level_pos - 1, values_pos - 1);
values_pos -= 1;
} else {
values[level_pos - 1] = T::T::default();
}
level_pos -= 1;
}
}
// Fill in bitmap data
if let Some(null_buffer) = self.null_bitmap.as_mut() {
let def_levels = def_levels.ok_or_else(|| {
general_err!(
"Definition levels should exist when data is less than levels!"
)
})?;
(0..levels_read)
.for_each(|idx| null_buffer.append(def_levels[idx] == max_def_level));
}
let values_read = max(values_read, levels_read);
self.set_values_written(self.values_written + values_read)?;
Ok(values_read)
}
/// Split values into records according repetition definition and returns number of
/// records read.
#[allow(clippy::unnecessary_wraps)]
fn split_records(&mut self, records_to_read: usize) -> Result<usize> {
let rep_levels = self.rep_levels.as_ref().map(|buf| {
let (prefix, rep_levels, suffix) =
unsafe { buf.as_slice().align_to::<i16>() };
assert!(prefix.is_empty() && suffix.is_empty());
rep_levels
});
match rep_levels {
Some(buf) => {
let mut records_read = 0;
while (self.values_seen < self.values_written)
&& (records_read < records_to_read)
{
if buf[self.values_seen] == 0 {
if self.in_middle_of_record {
records_read += 1;
self.num_records += 1;
self.num_values = self.values_seen;
}
self.in_middle_of_record = true;
}
self.values_seen += 1;
}
Ok(records_read)
}
None => {
let records_read =
min(records_to_read, self.values_written - self.values_seen);
self.num_records += records_read;
self.num_values += records_read;
self.values_seen += records_read;
self.in_middle_of_record = false;
Ok(records_read)
}
}
}
#[allow(clippy::unnecessary_wraps)]
fn set_values_written(&mut self, new_values_written: usize) -> Result<()> {
self.values_written = new_values_written;
self.records
.resize(self.values_written * T::get_type_size(), 0);
let new_levels_len = self.values_written * size_of::<i16>();
if let Some(ref mut buf) = self.rep_levels {
buf.resize(new_levels_len, 0)
};
if let Some(ref mut buf) = self.def_levels {
buf.resize(new_levels_len, 0)
};
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::RecordReader;
use crate::basic::Encoding;
use crate::column::page::Page;
use crate::column::page::PageReader;
use crate::data_type::Int32Type;
use crate::errors::Result;
use crate::schema::parser::parse_message_type;
use crate::schema::types::SchemaDescriptor;
use crate::util::test_common::page_util::{DataPageBuilder, DataPageBuilderImpl};
use arrow::array::{BooleanBufferBuilder, Int16BufferBuilder, Int32BufferBuilder};
use arrow::bitmap::Bitmap;
use std::sync::Arc;
struct TestPageReader {
pages: Box<dyn Iterator<Item = Page>>,
}
impl TestPageReader {
pub fn new(pages: Vec<Page>) -> Self {
Self {
pages: Box::new(pages.into_iter()),
}
}
}
impl PageReader for TestPageReader {
fn get_next_page(&mut self) -> Result<Option<Page>> {
Ok(self.pages.next())
}
}
impl Iterator for TestPageReader {
type Item = Result<Page>;
fn next(&mut self) -> Option<Self::Item> {
self.get_next_page().transpose()
}
}
#[test]
fn test_read_required_records() {
// Construct column schema
let message_type = "
message test_schema {
REQUIRED INT32 leaf;
}
";
let desc = parse_message_type(message_type)
.map(|t| SchemaDescriptor::new(Arc::new(t)))
.map(|s| s.column(0))
.unwrap();
// Construct record reader
let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
// First page
// Records data:
// test_schema
// leaf: 4
// test_schema
// leaf: 7
// test_schema
// leaf: 6
// test_schema
// left: 3
// test_schema
// left: 2
{
let values = [4, 7, 6, 3, 2];
let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
let page_reader = Box::new(TestPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(2).unwrap());
assert_eq!(2, record_reader.num_records());
assert_eq!(2, record_reader.num_values());
assert_eq!(3, record_reader.read_records(3).unwrap());
assert_eq!(5, record_reader.num_records());
assert_eq!(5, record_reader.num_values());
}
// Second page
// Records data:
// test_schema
// leaf: 8
// test_schema
// leaf: 9
{
let values = [8, 9];
let mut pb = DataPageBuilderImpl::new(desc, 2, true);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
let page_reader = Box::new(TestPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(10).unwrap());
assert_eq!(7, record_reader.num_records());
assert_eq!(7, record_reader.num_values());
}
let mut bb = Int32BufferBuilder::new(7);
bb.append_slice(&[4, 7, 6, 3, 2, 8, 9]);
let expected_buffer = bb.finish();
assert_eq!(
expected_buffer,
record_reader.consume_record_data().unwrap()
);
assert_eq!(None, record_reader.consume_def_levels().unwrap());
assert_eq!(None, record_reader.consume_bitmap().unwrap());
}
#[test]
fn test_read_optional_records() {
// Construct column schema
let message_type = "
message test_schema {
OPTIONAL Group test_struct {
OPTIONAL INT32 leaf;
}
}
";
let desc = parse_message_type(message_type)
.map(|t| SchemaDescriptor::new(Arc::new(t)))
.map(|s| s.column(0))
.unwrap();
// Construct record reader
let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
// First page
// Records data:
// test_schema
// test_struct
// test_schema
// test_struct
// left: 7
// test_schema
// test_schema
// test_struct
// leaf: 6
// test_schema
// test_struct
// leaf: 6
{
let values = [7, 6, 3];
//empty, non-empty, empty, non-empty, non-empty
let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
pb.add_def_levels(2, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
let page_reader = Box::new(TestPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(2).unwrap());
assert_eq!(2, record_reader.num_records());
assert_eq!(2, record_reader.num_values());
assert_eq!(3, record_reader.read_records(3).unwrap());
assert_eq!(5, record_reader.num_records());
assert_eq!(5, record_reader.num_values());
}
// Second page
// Records data:
// test_schema
// test_schema
// test_struct
// left: 8
{
let values = [8];
//empty, non-empty
let def_levels = [0i16, 2i16];
let mut pb = DataPageBuilderImpl::new(desc, 2, true);
pb.add_def_levels(2, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
let page_reader = Box::new(TestPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(2, record_reader.read_records(10).unwrap());
assert_eq!(7, record_reader.num_records());
assert_eq!(7, record_reader.num_values());
}
// Verify result record data
let mut bb = Int32BufferBuilder::new(7);
bb.append_slice(&[0, 7, 0, 6, 3, 0, 8]);
let expected_buffer = bb.finish();
assert_eq!(
expected_buffer,
record_reader.consume_record_data().unwrap()
);
// Verify result def levels
let mut bb = Int16BufferBuilder::new(7);
bb.append_slice(&[1i16, 2i16, 0i16, 2i16, 2i16, 0i16, 2i16]);
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
record_reader.consume_def_levels().unwrap()
);
// Verify bitmap
let mut bb = BooleanBufferBuilder::new(7);
bb.append_slice(&[false, true, false, true, true, false, true]);
let expected_bitmap = Bitmap::from(bb.finish());
assert_eq!(
Some(expected_bitmap),
record_reader.consume_bitmap().unwrap()
);
}
#[test]
fn test_read_repeated_records() {
// Construct column schema
let message_type = "
message test_schema {
REPEATED Group test_struct {
REPEATED INT32 leaf;
}
}
";
let desc = parse_message_type(message_type)
.map(|t| SchemaDescriptor::new(Arc::new(t)))
.map(|s| s.column(0))
.unwrap();
// Construct record reader
let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
// First page
// Records data:
// test_schema
// test_struct
// leaf: 4
// test_schema
// test_schema
// test_struct
// test_struct
// leaf: 7
// leaf: 6
// leaf: 3
// test_struct
// leaf: 2
{
let values = [4, 7, 6, 3, 2];
let def_levels = [2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16];
let rep_levels = [0i16, 0i16, 0i16, 1i16, 2i16, 2i16, 1i16];
let mut pb = DataPageBuilderImpl::new(desc.clone(), 7, true);
pb.add_rep_levels(2, &rep_levels);
pb.add_def_levels(2, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
let page_reader = Box::new(TestPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(1, record_reader.read_records(1).unwrap());
assert_eq!(1, record_reader.num_records());
assert_eq!(1, record_reader.num_values());
assert_eq!(2, record_reader.read_records(3).unwrap());
assert_eq!(3, record_reader.num_records());
assert_eq!(7, record_reader.num_values());
}
// Second page
// Records data:
// test_schema
// test_struct
// leaf: 8
// leaf: 9
{
let values = [8, 9];
let def_levels = [2i16, 2i16];
let rep_levels = [0i16, 2i16];
let mut pb = DataPageBuilderImpl::new(desc, 2, true);
pb.add_rep_levels(2, &rep_levels);
pb.add_def_levels(2, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
let page_reader = Box::new(TestPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(1, record_reader.read_records(10).unwrap());
assert_eq!(4, record_reader.num_records());
assert_eq!(9, record_reader.num_values());
}
// Verify result record data
let mut bb = Int32BufferBuilder::new(9);
bb.append_slice(&[4, 0, 0, 7, 6, 3, 2, 8, 9]);
let expected_buffer = bb.finish();
assert_eq!(
expected_buffer,
record_reader.consume_record_data().unwrap()
);
// Verify result def levels
let mut bb = Int16BufferBuilder::new(9);
bb.append_slice(&[2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16, 2i16, 2i16]);
let expected_def_levels = bb.finish();
assert_eq!(
Some(expected_def_levels),
record_reader.consume_def_levels().unwrap()
);
// Verify bitmap
let mut bb = BooleanBufferBuilder::new(9);
bb.append_slice(&[true, false, false, true, true, true, true, true, true]);
let expected_bitmap = Bitmap::from(bb.finish());
assert_eq!(
Some(expected_bitmap),
record_reader.consume_bitmap().unwrap()
);
}
#[test]
fn test_read_more_than_one_batch() {
// Construct column schema
let message_type = "
message test_schema {
REPEATED INT32 leaf;
}
";
let desc = parse_message_type(message_type)
.map(|t| SchemaDescriptor::new(Arc::new(t)))
.map(|s| s.column(0))
.unwrap();
// Construct record reader
let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
{
let values = [100; 5000];
let def_levels = [1i16; 5000];
let mut rep_levels = [1i16; 5000];
for idx in 0..1000 {
rep_levels[idx * 5] = 0i16;
}
let mut pb = DataPageBuilderImpl::new(desc, 5000, true);
pb.add_rep_levels(1, &rep_levels);
pb.add_def_levels(1, &def_levels);
pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
let page = pb.consume();
let page_reader = Box::new(TestPageReader::new(vec![page]));
record_reader.set_page_reader(page_reader).unwrap();
assert_eq!(1000, record_reader.read_records(1000).unwrap());
assert_eq!(1000, record_reader.num_records());
assert_eq!(5000, record_reader.num_values());
}
}
}