blob: c06d872533acd056ea812c65996a51327a3e6627 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use super::array_reader::ArrayReader;
use crate::arrow::schema::parquet_to_arrow_field;
use crate::basic::Encoding;
use crate::errors::{ParquetError, Result};
use crate::{
column::page::{Page, PageIterator},
memory::ByteBufferPtr,
schema::types::{ColumnDescPtr, ColumnDescriptor},
};
use arrow::{
array::{ArrayRef, Int16Array},
buffer::MutableBuffer,
datatypes::{DataType as ArrowType, ToByteSlice},
};
use std::{any::Any, collections::VecDeque, marker::PhantomData};
use std::{cell::RefCell, rc::Rc};
struct UnzipIter<Source, Target, State> {
shared_state: Rc<RefCell<State>>,
select_item_buffer: fn(&mut State) -> &mut VecDeque<Target>,
consume_source_item: fn(source_item: Source, state: &mut State) -> Target,
}
impl<Source, Target, State> UnzipIter<Source, Target, State> {
fn new(
shared_state: Rc<RefCell<State>>,
item_buffer_selector: fn(&mut State) -> &mut VecDeque<Target>,
source_item_consumer: fn(source_item: Source, state: &mut State) -> Target,
) -> Self {
Self {
shared_state,
select_item_buffer: item_buffer_selector,
consume_source_item: source_item_consumer,
}
}
}
trait UnzipIterState<T> {
type SourceIter: Iterator<Item = T>;
fn source_iter(&mut self) -> &mut Self::SourceIter;
}
impl<Source, Target, State: UnzipIterState<Source>> Iterator
for UnzipIter<Source, Target, State>
{
type Item = Target;
fn next(&mut self) -> Option<Self::Item> {
let mut inner = self.shared_state.borrow_mut();
// try to get one from the stored data
(self.select_item_buffer)(&mut *inner)
.pop_front()
.or_else(||
// nothing stored, we need a new element.
inner.source_iter().next().map(|s| {
(self.consume_source_item)(s, &mut inner)
}))
}
}
struct PageBufferUnzipIterState<V, L, It> {
iter: It,
value_iter_buffer: VecDeque<V>,
def_level_iter_buffer: VecDeque<L>,
rep_level_iter_buffer: VecDeque<L>,
}
impl<V, L, It: Iterator<Item = (V, L, L)>> UnzipIterState<(V, L, L)>
for PageBufferUnzipIterState<V, L, It>
{
type SourceIter = It;
#[inline]
fn source_iter(&mut self) -> &mut Self::SourceIter {
&mut self.iter
}
}
type ValueUnzipIter<V, L, It> =
UnzipIter<(V, L, L), V, PageBufferUnzipIterState<V, L, It>>;
type LevelUnzipIter<V, L, It> =
UnzipIter<(V, L, L), L, PageBufferUnzipIterState<V, L, It>>;
type PageUnzipResult<V, L, It> = (
ValueUnzipIter<V, L, It>,
LevelUnzipIter<V, L, It>,
LevelUnzipIter<V, L, It>,
);
fn unzip_iter<V, L, It: Iterator<Item = (V, L, L)>>(it: It) -> PageUnzipResult<V, L, It> {
let shared_data = Rc::new(RefCell::new(PageBufferUnzipIterState {
iter: it,
value_iter_buffer: VecDeque::new(),
def_level_iter_buffer: VecDeque::new(),
rep_level_iter_buffer: VecDeque::new(),
}));
let value_iter = UnzipIter::new(
shared_data.clone(),
|state| &mut state.value_iter_buffer,
|(v, d, r), state| {
state.def_level_iter_buffer.push_back(d);
state.rep_level_iter_buffer.push_back(r);
v
},
);
let def_level_iter = UnzipIter::new(
shared_data.clone(),
|state| &mut state.def_level_iter_buffer,
|(v, d, r), state| {
state.value_iter_buffer.push_back(v);
state.rep_level_iter_buffer.push_back(r);
d
},
);
let rep_level_iter = UnzipIter::new(
shared_data,
|state| &mut state.rep_level_iter_buffer,
|(v, d, r), state| {
state.value_iter_buffer.push_back(v);
state.def_level_iter_buffer.push_back(d);
r
},
);
(value_iter, def_level_iter, rep_level_iter)
}
pub trait ArrayConverter {
fn convert_value_bytes(
&self,
value_decoder: &mut impl ValueDecoder,
num_values: usize,
) -> Result<arrow::array::ArrayData>;
}
pub struct ArrowArrayReader<'a, C: ArrayConverter + 'a> {
column_desc: ColumnDescPtr,
data_type: ArrowType,
def_level_decoder: Box<dyn ValueDecoder + 'a>,
rep_level_decoder: Box<dyn ValueDecoder + 'a>,
value_decoder: Box<dyn ValueDecoder + 'a>,
last_def_levels: Option<Int16Array>,
last_rep_levels: Option<Int16Array>,
array_converter: C,
}
pub(crate) struct ColumnChunkContext {
dictionary_values: Option<Vec<ByteBufferPtr>>,
}
impl ColumnChunkContext {
fn new() -> Self {
Self {
dictionary_values: None,
}
}
fn set_dictionary(&mut self, dictionary_values: Vec<ByteBufferPtr>) {
self.dictionary_values = Some(dictionary_values);
}
}
type PageDecoderTuple = (
Box<dyn ValueDecoder>,
Box<dyn ValueDecoder>,
Box<dyn ValueDecoder>,
);
impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
pub fn try_new<P: PageIterator + 'a>(
column_chunk_iterator: P,
column_desc: ColumnDescPtr,
array_converter: C,
arrow_type: Option<ArrowType>,
) -> Result<Self> {
let data_type = match arrow_type {
Some(t) => t,
None => parquet_to_arrow_field(column_desc.as_ref())?
.data_type()
.clone(),
};
type PageIteratorItem = Result<(Page, Rc<RefCell<ColumnChunkContext>>)>;
let page_iter = column_chunk_iterator
// build iterator of pages across column chunks
.flat_map(|x| -> Box<dyn Iterator<Item = PageIteratorItem>> {
// attach column chunk context
let context = Rc::new(RefCell::new(ColumnChunkContext::new()));
match x {
Ok(page_reader) => Box::new(
page_reader.map(move |pr| pr.map(|p| (p, context.clone()))),
),
// errors from reading column chunks / row groups are propagated to page level
Err(e) => Box::new(std::iter::once(Err(e))),
}
});
// capture a clone of column_desc in closure so that it can outlive current function
let map_page_fn_factory = |column_desc: ColumnDescPtr| {
move |x: Result<(Page, Rc<RefCell<ColumnChunkContext>>)>| {
x.and_then(|(page, context)| {
Self::map_page(page, context, column_desc.as_ref())
})
}
};
let map_page_fn = map_page_fn_factory(column_desc.clone());
// map page iterator into tuple of buffer iterators for (values, def levels, rep levels)
// errors from lower levels are surfaced through the value decoder iterator
let decoder_iter = page_iter.map(map_page_fn).map(|x| match x {
Ok(iter_tuple) => iter_tuple,
// errors from reading pages are propagated to decoder iterator level
Err(e) => Self::map_page_error(e),
});
// split tuple iterator into separate iterators for (values, def levels, rep levels)
let (value_iter, def_level_iter, rep_level_iter) = unzip_iter(decoder_iter);
Ok(Self {
column_desc,
data_type,
def_level_decoder: Box::new(CompositeValueDecoder::new(def_level_iter)),
rep_level_decoder: Box::new(CompositeValueDecoder::new(rep_level_iter)),
value_decoder: Box::new(CompositeValueDecoder::new(value_iter)),
last_def_levels: None,
last_rep_levels: None,
array_converter,
})
}
#[inline]
fn def_levels_available(column_desc: &ColumnDescriptor) -> bool {
column_desc.max_def_level() > 0
}
#[inline]
fn rep_levels_available(column_desc: &ColumnDescriptor) -> bool {
column_desc.max_rep_level() > 0
}
fn map_page_error(err: ParquetError) -> PageDecoderTuple {
(
Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
Box::new(<dyn ValueDecoder>::once(Err(err))),
)
}
// Split Result<Page> into Result<(Iterator<Values>, Iterator<DefLevels>, Iterator<RepLevels>)>
// this method could fail, e.g. if the page encoding is not supported
fn map_page(
page: Page,
column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
column_desc: &ColumnDescriptor,
) -> Result<PageDecoderTuple> {
use crate::encodings::levels::LevelDecoder;
match page {
Page::DictionaryPage {
buf,
num_values,
encoding,
..
} => {
let mut column_chunk_context = column_chunk_context.borrow_mut();
if column_chunk_context.dictionary_values.is_some() {
return Err(general_err!(
"Column chunk cannot have more than one dictionary"
));
}
// create plain decoder for dictionary values
let mut dict_decoder = Self::get_dictionary_page_decoder(
buf,
num_values as usize,
encoding,
column_desc,
)?;
// decode and cache dictionary values
let dictionary_values = dict_decoder.read_dictionary_values()?;
column_chunk_context.set_dictionary(dictionary_values);
// a dictionary page doesn't return any values
Ok((
Box::new(<dyn ValueDecoder>::empty()),
Box::new(<dyn ValueDecoder>::empty()),
Box::new(<dyn ValueDecoder>::empty()),
))
}
Page::DataPage {
buf,
num_values,
encoding,
def_level_encoding,
rep_level_encoding,
statistics: _,
} => {
let mut buffer_ptr = buf;
// create rep level decoder iterator
let rep_level_iter: Box<dyn ValueDecoder> =
if Self::rep_levels_available(&column_desc) {
let mut rep_decoder = LevelDecoder::v1(
rep_level_encoding,
column_desc.max_rep_level(),
);
let rep_level_byte_len =
rep_decoder.set_data(num_values as usize, buffer_ptr.all());
// advance buffer pointer
buffer_ptr = buffer_ptr.start_from(rep_level_byte_len);
Box::new(LevelValueDecoder::new(rep_decoder))
} else {
Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(
"rep levels are not available".to_string(),
))))
};
// create def level decoder iterator
let def_level_iter: Box<dyn ValueDecoder> =
if Self::def_levels_available(&column_desc) {
let mut def_decoder = LevelDecoder::v1(
def_level_encoding,
column_desc.max_def_level(),
);
let def_levels_byte_len =
def_decoder.set_data(num_values as usize, buffer_ptr.all());
// advance buffer pointer
buffer_ptr = buffer_ptr.start_from(def_levels_byte_len);
Box::new(LevelValueDecoder::new(def_decoder))
} else {
Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(
"def levels are not available".to_string(),
))))
};
// create value decoder iterator
let value_iter = Self::get_value_decoder(
buffer_ptr,
num_values as usize,
encoding,
column_desc,
column_chunk_context,
)?;
Ok((value_iter, def_level_iter, rep_level_iter))
}
Page::DataPageV2 {
buf,
num_values,
encoding,
num_nulls: _,
num_rows: _,
def_levels_byte_len,
rep_levels_byte_len,
is_compressed: _,
statistics: _,
} => {
let mut offset = 0;
// create rep level decoder iterator
let rep_level_iter: Box<dyn ValueDecoder> =
if Self::rep_levels_available(&column_desc) {
let rep_levels_byte_len = rep_levels_byte_len as usize;
let mut rep_decoder =
LevelDecoder::v2(column_desc.max_rep_level());
rep_decoder.set_data_range(
num_values as usize,
&buf,
offset,
rep_levels_byte_len,
);
offset += rep_levels_byte_len;
Box::new(LevelValueDecoder::new(rep_decoder))
} else {
Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(
"rep levels are not available".to_string(),
))))
};
// create def level decoder iterator
let def_level_iter: Box<dyn ValueDecoder> =
if Self::def_levels_available(&column_desc) {
let def_levels_byte_len = def_levels_byte_len as usize;
let mut def_decoder =
LevelDecoder::v2(column_desc.max_def_level());
def_decoder.set_data_range(
num_values as usize,
&buf,
offset,
def_levels_byte_len,
);
offset += def_levels_byte_len;
Box::new(LevelValueDecoder::new(def_decoder))
} else {
Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(
"def levels are not available".to_string(),
))))
};
// create value decoder iterator
let values_buffer = buf.start_from(offset);
let value_iter = Self::get_value_decoder(
values_buffer,
num_values as usize,
encoding,
column_desc,
column_chunk_context,
)?;
Ok((value_iter, def_level_iter, rep_level_iter))
}
}
}
fn get_dictionary_page_decoder(
values_buffer: ByteBufferPtr,
num_values: usize,
mut encoding: Encoding,
column_desc: &ColumnDescriptor,
) -> Result<Box<dyn DictionaryValueDecoder>> {
if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
encoding = Encoding::RLE_DICTIONARY
}
if encoding == Encoding::RLE_DICTIONARY {
Ok(
Self::get_plain_value_decoder(values_buffer, num_values, column_desc)
.into_dictionary_decoder(),
)
} else {
Err(nyi_err!(
"Invalid/Unsupported encoding type for dictionary: {}",
encoding
))
}
}
fn get_value_decoder(
values_buffer: ByteBufferPtr,
num_values: usize,
mut encoding: Encoding,
column_desc: &ColumnDescriptor,
column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
) -> Result<Box<dyn ValueDecoder>> {
if encoding == Encoding::PLAIN_DICTIONARY {
encoding = Encoding::RLE_DICTIONARY;
}
match encoding {
Encoding::PLAIN => {
Ok(
Self::get_plain_value_decoder(values_buffer, num_values, column_desc)
.into_value_decoder(),
)
}
Encoding::RLE_DICTIONARY => {
if column_chunk_context.borrow().dictionary_values.is_some() {
let value_bit_len = Self::get_column_physical_bit_len(column_desc);
let dictionary_decoder: Box<dyn ValueDecoder> = if value_bit_len == 0
{
Box::new(VariableLenDictionaryDecoder::new(
column_chunk_context,
values_buffer,
num_values,
))
} else {
Box::new(FixedLenDictionaryDecoder::new(
column_chunk_context,
values_buffer,
num_values,
value_bit_len,
))
};
Ok(dictionary_decoder)
} else {
Err(general_err!("Dictionary values have not been initialized."))
}
}
// Encoding::RLE => Box::new(RleValueDecoder::new()),
// Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()),
// Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()),
// Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()),
e => return Err(nyi_err!("Encoding {} is not supported", e)),
}
}
fn get_column_physical_bit_len(column_desc: &ColumnDescriptor) -> usize {
use crate::basic::Type as PhysicalType;
// parquet only supports a limited number of physical types
// later converters cast to a more specific arrow / logical type if necessary
match column_desc.physical_type() {
PhysicalType::BOOLEAN => 1,
PhysicalType::INT32 | PhysicalType::FLOAT => 32,
PhysicalType::INT64 | PhysicalType::DOUBLE => 64,
PhysicalType::INT96 => 96,
PhysicalType::BYTE_ARRAY => 0,
PhysicalType::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize * 8,
}
}
fn get_plain_value_decoder(
values_buffer: ByteBufferPtr,
num_values: usize,
column_desc: &ColumnDescriptor,
) -> Box<dyn PlainValueDecoder> {
let value_bit_len = Self::get_column_physical_bit_len(column_desc);
if value_bit_len == 0 {
Box::new(VariableLenPlainDecoder::new(values_buffer, num_values))
} else {
Box::new(FixedLenPlainDecoder::new(
values_buffer,
num_values,
value_bit_len,
))
}
}
fn build_level_array(
level_decoder: &mut impl ValueDecoder,
batch_size: usize,
) -> Result<Int16Array> {
use arrow::datatypes::Int16Type;
let level_converter = PrimitiveArrayConverter::<Int16Type>::new();
let array_data =
level_converter.convert_value_bytes(level_decoder, batch_size)?;
Ok(Int16Array::from(array_data))
}
}
impl<C: ArrayConverter> ArrayReader for ArrowArrayReader<'static, C> {
fn as_any(&self) -> &dyn Any {
self
}
fn get_data_type(&self) -> &ArrowType {
&self.data_type
}
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
if Self::rep_levels_available(&self.column_desc) {
// read rep levels if available
let rep_level_array =
Self::build_level_array(&mut self.rep_level_decoder, batch_size)?;
self.last_rep_levels = Some(rep_level_array);
}
// check if def levels are available
let (values_to_read, null_bitmap_array) =
if !Self::def_levels_available(&self.column_desc) {
// if no def levels - just read (up to) batch_size values
(batch_size, None)
} else {
// if def levels are available - they determine how many values will be read
// decode def levels, return first error if any
let def_level_array =
Self::build_level_array(&mut self.def_level_decoder, batch_size)?;
let def_level_count = def_level_array.len();
// use eq_scalar to efficiently build null bitmap array from def levels
let null_bitmap_array = arrow::compute::eq_scalar(
&def_level_array,
self.column_desc.max_def_level(),
)?;
self.last_def_levels = Some(def_level_array);
// efficiently calculate values to read
let values_to_read = null_bitmap_array
.values()
.count_set_bits_offset(0, def_level_count);
let maybe_null_bitmap = if values_to_read != null_bitmap_array.len() {
Some(null_bitmap_array)
} else {
// shortcut if no NULLs
None
};
(values_to_read, maybe_null_bitmap)
};
// read a batch of values
// converter only creates a no-null / all value array data
let mut value_array_data = self
.array_converter
.convert_value_bytes(&mut self.value_decoder, values_to_read)?;
if let Some(null_bitmap_array) = null_bitmap_array {
// Only if def levels are available - insert null values efficiently using MutableArrayData.
// This will require value bytes to be copied again, but converter requirements are reduced.
// With a small number of NULLs, this will only be a few copies of large byte sequences.
let actual_batch_size = null_bitmap_array.len();
// use_nulls is false, because null_bitmap_array is already calculated and re-used
let mut mutable = arrow::array::MutableArrayData::new(
vec![&value_array_data],
false,
actual_batch_size,
);
// SlicesIterator slices only the true values, NULLs are inserted to fill any gaps
arrow::compute::SlicesIterator::new(&null_bitmap_array).for_each(
|(start, end)| {
// the gap needs to be filled with NULLs
if start > mutable.len() {
let nulls_to_add = start - mutable.len();
mutable.extend_nulls(nulls_to_add);
}
// fill values, adjust start and end with NULL count so far
let nulls_added = mutable.null_count();
mutable.extend(0, start - nulls_added, end - nulls_added);
},
);
// any remaining part is NULLs
if mutable.len() < actual_batch_size {
let nulls_to_add = actual_batch_size - mutable.len();
mutable.extend_nulls(nulls_to_add);
}
value_array_data = mutable
.into_builder()
.null_bit_buffer(null_bitmap_array.values().clone())
.build();
}
let mut array = arrow::array::make_array(value_array_data);
if array.data_type() != &self.data_type {
// cast array to self.data_type if necessary
array = arrow::compute::cast(&array, &self.data_type)?
}
Ok(array)
}
fn get_def_levels(&self) -> Option<&[i16]> {
self.last_def_levels.as_ref().map(|x| x.values())
}
fn get_rep_levels(&self) -> Option<&[i16]> {
self.last_rep_levels.as_ref().map(|x| x.values())
}
}
use crate::encodings::rle::RleDecoder;
pub trait ValueDecoder {
fn read_value_bytes(
&mut self,
num_values: usize,
read_bytes: &mut dyn FnMut(&[u8], usize),
) -> Result<usize>;
}
trait DictionaryValueDecoder {
fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>>;
}
trait PlainValueDecoder: ValueDecoder + DictionaryValueDecoder {
fn into_value_decoder(self: Box<Self>) -> Box<dyn ValueDecoder>;
fn into_dictionary_decoder(self: Box<Self>) -> Box<dyn DictionaryValueDecoder>;
}
impl<T> PlainValueDecoder for T
where
T: ValueDecoder + DictionaryValueDecoder + 'static,
{
fn into_value_decoder(self: Box<T>) -> Box<dyn ValueDecoder> {
self
}
fn into_dictionary_decoder(self: Box<T>) -> Box<dyn DictionaryValueDecoder> {
self
}
}
impl dyn ValueDecoder {
fn empty() -> impl ValueDecoder {
SingleValueDecoder::new(Ok(0))
}
fn once(value: Result<usize>) -> impl ValueDecoder {
SingleValueDecoder::new(value)
}
}
impl ValueDecoder for Box<dyn ValueDecoder> {
#[inline]
fn read_value_bytes(
&mut self,
num_values: usize,
read_bytes: &mut dyn FnMut(&[u8], usize),
) -> Result<usize> {
self.as_mut().read_value_bytes(num_values, read_bytes)
}
}
struct SingleValueDecoder {
value: Result<usize>,
}
impl SingleValueDecoder {
fn new(value: Result<usize>) -> Self {
Self { value }
}
}
impl ValueDecoder for SingleValueDecoder {
fn read_value_bytes(
&mut self,
_num_values: usize,
_read_bytes: &mut dyn FnMut(&[u8], usize),
) -> Result<usize> {
self.value.clone()
}
}
struct CompositeValueDecoder<I: Iterator<Item = Box<dyn ValueDecoder>>> {
current_decoder: Option<Box<dyn ValueDecoder>>,
decoder_iter: I,
}
impl<I: Iterator<Item = Box<dyn ValueDecoder>>> CompositeValueDecoder<I> {
fn new(mut decoder_iter: I) -> Self {
let current_decoder = decoder_iter.next();
Self {
current_decoder,
decoder_iter,
}
}
}
impl<I: Iterator<Item = Box<dyn ValueDecoder>>> ValueDecoder
for CompositeValueDecoder<I>
{
fn read_value_bytes(
&mut self,
num_values: usize,
read_bytes: &mut dyn FnMut(&[u8], usize),
) -> Result<usize> {
let mut values_to_read = num_values;
while values_to_read > 0 {
let value_decoder = match self.current_decoder.as_mut() {
Some(d) => d,
// no more decoders
None => break,
};
while values_to_read > 0 {
let values_read =
value_decoder.read_value_bytes(values_to_read, read_bytes)?;
if values_read > 0 {
values_to_read -= values_read;
} else {
// no more values in current decoder
self.current_decoder = self.decoder_iter.next();
break;
}
}
}
Ok(num_values - values_to_read)
}
}
struct LevelValueDecoder {
level_decoder: crate::encodings::levels::LevelDecoder,
level_value_buffer: Vec<i16>,
}
impl LevelValueDecoder {
fn new(level_decoder: crate::encodings::levels::LevelDecoder) -> Self {
Self {
level_decoder,
level_value_buffer: vec![0i16; 2048],
}
}
}
impl ValueDecoder for LevelValueDecoder {
fn read_value_bytes(
&mut self,
num_values: usize,
read_bytes: &mut dyn FnMut(&[u8], usize),
) -> Result<usize> {
let value_size = std::mem::size_of::<i16>();
let mut total_values_read = 0;
while total_values_read < num_values {
let values_to_read = std::cmp::min(
num_values - total_values_read,
self.level_value_buffer.len(),
);
let values_read = match self
.level_decoder
.get(&mut self.level_value_buffer[..values_to_read])
{
Ok(values_read) => values_read,
Err(e) => return Err(e),
};
if values_read > 0 {
let level_value_bytes =
&self.level_value_buffer.to_byte_slice()[..values_read * value_size];
read_bytes(level_value_bytes, values_read);
total_values_read += values_read;
} else {
break;
}
}
Ok(total_values_read)
}
}
pub(crate) struct FixedLenPlainDecoder {
data: ByteBufferPtr,
num_values: usize,
value_bit_len: usize,
}
impl FixedLenPlainDecoder {
pub(crate) fn new(
data: ByteBufferPtr,
num_values: usize,
value_bit_len: usize,
) -> Self {
Self {
data,
num_values,
value_bit_len,
}
}
}
impl DictionaryValueDecoder for FixedLenPlainDecoder {
fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>> {
let value_byte_len = self.value_bit_len / 8;
let available_values = self.data.len() / value_byte_len;
let values_to_read = std::cmp::min(available_values, self.num_values);
let byte_len = values_to_read * value_byte_len;
let values = vec![self.data.range(0, byte_len)];
self.num_values = 0;
self.data.set_range(self.data.start(), 0);
Ok(values)
}
}
impl ValueDecoder for FixedLenPlainDecoder {
fn read_value_bytes(
&mut self,
num_values: usize,
read_bytes: &mut dyn FnMut(&[u8], usize),
) -> Result<usize> {
let available_values = self.data.len() * 8 / self.value_bit_len;
if available_values > 0 {
let values_to_read = std::cmp::min(available_values, num_values);
let byte_len = values_to_read * self.value_bit_len / 8;
read_bytes(&self.data.data()[..byte_len], values_to_read);
self.data
.set_range(self.data.start() + byte_len, self.data.len() - byte_len);
Ok(values_to_read)
} else {
Ok(0)
}
}
}
pub(crate) struct VariableLenPlainDecoder {
data: ByteBufferPtr,
num_values: usize,
position: usize,
}
impl VariableLenPlainDecoder {
pub(crate) fn new(data: ByteBufferPtr, num_values: usize) -> Self {
Self {
data,
num_values,
position: 0,
}
}
}
impl DictionaryValueDecoder for VariableLenPlainDecoder {
fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>> {
const LEN_SIZE: usize = std::mem::size_of::<u32>();
let data = self.data.data();
let data_len = data.len();
let values_to_read = self.num_values;
let mut values = Vec::with_capacity(values_to_read);
let mut values_read = 0;
while self.position < data_len && values_read < values_to_read {
let len: usize =
read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize;
self.position += LEN_SIZE;
if data_len < self.position + len {
return Err(eof_err!("Not enough bytes to decode"));
}
values.push(self.data.range(self.position, len));
self.position += len;
values_read += 1;
}
self.num_values -= values_read;
Ok(values)
}
}
impl ValueDecoder for VariableLenPlainDecoder {
fn read_value_bytes(
&mut self,
num_values: usize,
read_bytes: &mut dyn FnMut(&[u8], usize),
) -> Result<usize> {
const LEN_SIZE: usize = std::mem::size_of::<u32>();
let data = self.data.data();
let data_len = data.len();
let values_to_read = std::cmp::min(self.num_values, num_values);
let mut values_read = 0;
while self.position < data_len && values_read < values_to_read {
let len: usize =
read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize;
self.position += LEN_SIZE;
if data_len < self.position + len {
return Err(eof_err!("Not enough bytes to decode"));
}
read_bytes(&data[self.position..][..len], 1);
self.position += len;
values_read += 1;
}
self.num_values -= values_read;
Ok(values_read)
}
}
pub(crate) struct FixedLenDictionaryDecoder {
context_ref: Rc<RefCell<ColumnChunkContext>>,
key_data_bufer: ByteBufferPtr,
num_values: usize,
rle_decoder: RleDecoder,
value_byte_len: usize,
keys_buffer: Vec<i32>,
}
impl FixedLenDictionaryDecoder {
pub(crate) fn new(
column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
key_data_bufer: ByteBufferPtr,
num_values: usize,
value_bit_len: usize,
) -> Self {
assert!(
value_bit_len % 8 == 0,
"value_bit_size must be a multiple of 8"
);
// First byte in `data` is bit width
let bit_width = key_data_bufer.data()[0];
let mut rle_decoder = RleDecoder::new(bit_width);
rle_decoder.set_data(key_data_bufer.start_from(1));
Self {
context_ref: column_chunk_context,
key_data_bufer,
num_values,
rle_decoder,
value_byte_len: value_bit_len / 8,
keys_buffer: vec![0; 2048],
}
}
}
impl ValueDecoder for FixedLenDictionaryDecoder {
fn read_value_bytes(
&mut self,
num_values: usize,
read_bytes: &mut dyn FnMut(&[u8], usize),
) -> Result<usize> {
if self.num_values == 0 {
return Ok(0);
}
let context = self.context_ref.borrow();
let values = context.dictionary_values.as_ref().unwrap();
let input_value_bytes = values[0].data();
// read no more than available values or requested values
let values_to_read = std::cmp::min(self.num_values, num_values);
let mut values_read = 0;
while values_read < values_to_read {
// read values in batches of up to self.keys_buffer.len()
let keys_to_read =
std::cmp::min(values_to_read - values_read, self.keys_buffer.len());
let keys_read = match self
.rle_decoder
.get_batch(&mut self.keys_buffer[..keys_to_read])
{
Ok(keys_read) => keys_read,
Err(e) => return Err(e),
};
if keys_read == 0 {
self.num_values = 0;
return Ok(values_read);
}
for i in 0..keys_read {
let key = self.keys_buffer[i] as usize;
read_bytes(
&input_value_bytes[key * self.value_byte_len..]
[..self.value_byte_len],
1,
);
}
values_read += keys_read;
}
self.num_values -= values_read;
Ok(values_read)
}
}
pub(crate) struct VariableLenDictionaryDecoder {
context_ref: Rc<RefCell<ColumnChunkContext>>,
key_data_bufer: ByteBufferPtr,
num_values: usize,
rle_decoder: RleDecoder,
keys_buffer: Vec<i32>,
}
impl VariableLenDictionaryDecoder {
pub(crate) fn new(
column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
key_data_bufer: ByteBufferPtr,
num_values: usize,
) -> Self {
// First byte in `data` is bit width
let bit_width = key_data_bufer.data()[0];
let mut rle_decoder = RleDecoder::new(bit_width);
rle_decoder.set_data(key_data_bufer.start_from(1));
Self {
context_ref: column_chunk_context,
key_data_bufer,
num_values,
rle_decoder,
keys_buffer: vec![0; 2048],
}
}
}
impl ValueDecoder for VariableLenDictionaryDecoder {
fn read_value_bytes(
&mut self,
num_values: usize,
read_bytes: &mut dyn FnMut(&[u8], usize),
) -> Result<usize> {
if self.num_values == 0 {
return Ok(0);
}
let context = self.context_ref.borrow();
let values = context.dictionary_values.as_ref().unwrap();
let values_to_read = std::cmp::min(self.num_values, num_values);
let mut values_read = 0;
while values_read < values_to_read {
// read values in batches of up to self.keys_buffer.len()
let keys_to_read =
std::cmp::min(values_to_read - values_read, self.keys_buffer.len());
let keys_read = match self
.rle_decoder
.get_batch(&mut self.keys_buffer[..keys_to_read])
{
Ok(keys_read) => keys_read,
Err(e) => return Err(e),
};
if keys_read == 0 {
self.num_values = 0;
return Ok(values_read);
}
for i in 0..keys_read {
let key = self.keys_buffer[i] as usize;
read_bytes(values[key].data(), 1);
}
values_read += keys_read;
}
self.num_values -= values_read;
Ok(values_read)
}
}
use arrow::datatypes::ArrowPrimitiveType;
pub struct PrimitiveArrayConverter<T: ArrowPrimitiveType> {
_phantom_data: PhantomData<T>,
}
impl<T: ArrowPrimitiveType> PrimitiveArrayConverter<T> {
pub fn new() -> Self {
Self {
_phantom_data: PhantomData,
}
}
}
impl<T: ArrowPrimitiveType> ArrayConverter for PrimitiveArrayConverter<T> {
fn convert_value_bytes(
&self,
value_decoder: &mut impl ValueDecoder,
num_values: usize,
) -> Result<arrow::array::ArrayData> {
let value_size = T::get_byte_width();
let values_byte_capacity = num_values * value_size;
let mut values_buffer = MutableBuffer::new(values_byte_capacity);
value_decoder.read_value_bytes(num_values, &mut |value_bytes, _| {
values_buffer.extend_from_slice(value_bytes);
})?;
// calculate actual data_len, which may be different from the iterator's upper bound
let value_count = values_buffer.len() / value_size;
let array_data = arrow::array::ArrayData::builder(T::DATA_TYPE)
.len(value_count)
.add_buffer(values_buffer.into())
.build();
Ok(array_data)
}
}
pub struct StringArrayConverter {}
impl StringArrayConverter {
pub fn new() -> Self {
Self {}
}
}
impl ArrayConverter for StringArrayConverter {
fn convert_value_bytes(
&self,
value_decoder: &mut impl ValueDecoder,
num_values: usize,
) -> Result<arrow::array::ArrayData> {
use arrow::datatypes::ArrowNativeType;
let offset_size = std::mem::size_of::<i32>();
let mut offsets_buffer = MutableBuffer::new((num_values + 1) * offset_size);
// allocate initial capacity of 1 byte for each item
let values_byte_capacity = num_values;
let mut values_buffer = MutableBuffer::new(values_byte_capacity);
let mut length_so_far = i32::default();
offsets_buffer.push(length_so_far);
value_decoder.read_value_bytes(num_values, &mut |value_bytes, values_read| {
debug_assert_eq!(
values_read, 1,
"offset length value buffers can only contain bytes for a single value"
);
length_so_far +=
<i32 as ArrowNativeType>::from_usize(value_bytes.len()).unwrap();
// this should be safe because a ValueDecoder should not read more than num_values
unsafe {
offsets_buffer.push_unchecked(length_so_far);
}
values_buffer.extend_from_slice(value_bytes);
})?;
// calculate actual data_len, which may be different from the iterator's upper bound
let data_len = (offsets_buffer.len() / offset_size) - 1;
let array_data = arrow::array::ArrayData::builder(ArrowType::Utf8)
.len(data_len)
.add_buffer(offsets_buffer.into())
.add_buffer(values_buffer.into())
.build();
Ok(array_data)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::column::page::Page;
use crate::data_type::ByteArray;
use crate::data_type::ByteArrayType;
use crate::schema::parser::parse_message_type;
use crate::schema::types::SchemaDescriptor;
use crate::util::test_common::page_util::{
DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
};
use crate::{
basic::Encoding, column::page::PageReader, schema::types::SchemaDescPtr,
};
use arrow::array::{PrimitiveArray, StringArray};
use arrow::datatypes::Int32Type as ArrowInt32;
use rand::{distributions::uniform::SampleUniform, thread_rng, Rng};
use std::sync::Arc;
/// Iterator for testing reading empty columns
struct EmptyPageIterator {
schema: SchemaDescPtr,
}
impl EmptyPageIterator {
fn new(schema: SchemaDescPtr) -> Self {
EmptyPageIterator { schema }
}
}
impl Iterator for EmptyPageIterator {
type Item = Result<Box<dyn PageReader>>;
fn next(&mut self) -> Option<Self::Item> {
None
}
}
impl PageIterator for EmptyPageIterator {
fn schema(&mut self) -> Result<SchemaDescPtr> {
Ok(self.schema.clone())
}
fn column_schema(&mut self) -> Result<ColumnDescPtr> {
Ok(self.schema.column(0))
}
}
#[test]
fn test_array_reader_empty_pages() {
// Construct column schema
let message_type = "
message test_schema {
REQUIRED INT32 leaf;
}
";
let schema = parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
let column_desc = schema.column(0);
let page_iterator = EmptyPageIterator::new(schema);
let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
let mut array_reader =
ArrowArrayReader::try_new(page_iterator, column_desc, converter, None)
.unwrap();
// expect no values to be read
let array = array_reader.next_batch(50).unwrap();
assert!(array.is_empty());
}
fn make_column_chunks<T: crate::data_type::DataType>(
column_desc: ColumnDescPtr,
encoding: Encoding,
num_levels: usize,
min_value: T::T,
max_value: T::T,
def_levels: &mut Vec<i16>,
rep_levels: &mut Vec<i16>,
values: &mut Vec<T::T>,
page_lists: &mut Vec<Vec<Page>>,
use_v2: bool,
num_chunks: usize,
) where
T::T: PartialOrd + SampleUniform + Copy,
{
for _i in 0..num_chunks {
let mut pages = VecDeque::new();
let mut data = Vec::new();
let mut page_def_levels = Vec::new();
let mut page_rep_levels = Vec::new();
crate::util::test_common::make_pages::<T>(
column_desc.clone(),
encoding,
1,
num_levels,
min_value,
max_value,
&mut page_def_levels,
&mut page_rep_levels,
&mut data,
&mut pages,
use_v2,
);
def_levels.append(&mut page_def_levels);
rep_levels.append(&mut page_rep_levels);
values.append(&mut data);
page_lists.push(Vec::from(pages));
}
}
#[test]
fn test_primitive_array_reader_data() {
// Construct column schema
let message_type = "
message test_schema {
REQUIRED INT32 leaf;
}
";
let schema = parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
let column_desc = schema.column(0);
// Construct page iterator
{
let mut data = Vec::new();
let mut page_lists = Vec::new();
make_column_chunks::<crate::data_type::Int32Type>(
column_desc.clone(),
Encoding::PLAIN,
100,
1,
200,
&mut Vec::new(),
&mut Vec::new(),
&mut data,
&mut page_lists,
true,
2,
);
let page_iterator =
InMemoryPageIterator::new(schema, column_desc.clone(), page_lists);
let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
let mut array_reader =
ArrowArrayReader::try_new(page_iterator, column_desc, converter, None)
.unwrap();
// Read first 50 values, which are all from the first column chunk
let array = array_reader.next_batch(50).unwrap();
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<ArrowInt32>>()
.unwrap();
assert_eq!(
&PrimitiveArray::<ArrowInt32>::from(data[0..50].to_vec()),
array
);
// Read next 100 values, the first 50 ones are from the first column chunk,
// and the last 50 ones are from the second column chunk
let array = array_reader.next_batch(100).unwrap();
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<ArrowInt32>>()
.unwrap();
assert_eq!(
&PrimitiveArray::<ArrowInt32>::from(data[50..150].to_vec()),
array
);
// Try to read 100 values, however there are only 50 values
let array = array_reader.next_batch(100).unwrap();
let array = array
.as_any()
.downcast_ref::<PrimitiveArray<ArrowInt32>>()
.unwrap();
assert_eq!(
&PrimitiveArray::<ArrowInt32>::from(data[150..200].to_vec()),
array
);
}
}
#[test]
fn test_primitive_array_reader_def_and_rep_levels() {
// Construct column schema
let message_type = "
message test_schema {
REPEATED Group test_mid {
OPTIONAL INT32 leaf;
}
}
";
let schema = parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
let column_desc = schema.column(0);
// Construct page iterator
{
let mut def_levels = Vec::new();
let mut rep_levels = Vec::new();
let mut page_lists = Vec::new();
make_column_chunks::<crate::data_type::Int32Type>(
column_desc.clone(),
Encoding::PLAIN,
100,
1,
200,
&mut def_levels,
&mut rep_levels,
&mut Vec::new(),
&mut page_lists,
true,
2,
);
let page_iterator =
InMemoryPageIterator::new(schema, column_desc.clone(), page_lists);
let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
let mut array_reader =
ArrowArrayReader::try_new(page_iterator, column_desc, converter, None)
.unwrap();
let mut accu_len: usize = 0;
// Read first 50 values, which are all from the first column chunk
let array = array_reader.next_batch(50).unwrap();
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
array_reader.get_def_levels()
);
assert_eq!(
Some(&rep_levels[accu_len..(accu_len + array.len())]),
array_reader.get_rep_levels()
);
accu_len += array.len();
// Read next 100 values, the first 50 ones are from the first column chunk,
// and the last 50 ones are from the second column chunk
let array = array_reader.next_batch(100).unwrap();
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
array_reader.get_def_levels()
);
assert_eq!(
Some(&rep_levels[accu_len..(accu_len + array.len())]),
array_reader.get_rep_levels()
);
accu_len += array.len();
// Try to read 100 values, however there are only 50 values
let array = array_reader.next_batch(100).unwrap();
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
array_reader.get_def_levels()
);
assert_eq!(
Some(&rep_levels[accu_len..(accu_len + array.len())]),
array_reader.get_rep_levels()
);
assert_eq!(accu_len + array.len(), 200);
}
}
#[test]
fn test_arrow_array_reader_string() {
// Construct column schema
let message_type = "
message test_schema {
REPEATED Group test_mid {
OPTIONAL BYTE_ARRAY leaf (UTF8);
}
}
";
let num_pages = 2;
let values_per_page = 100;
let str_base = "Hello World";
let schema = parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
let column_desc = schema.column(0);
let max_def_level = column_desc.max_def_level();
let max_rep_level = column_desc.max_rep_level();
assert_eq!(max_def_level, 2);
assert_eq!(max_rep_level, 1);
let mut rng = thread_rng();
let mut pages: Vec<Vec<Page>> = Vec::new();
let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
let mut all_values = Vec::with_capacity(num_pages * values_per_page);
for i in 0..num_pages {
let mut values = Vec::with_capacity(values_per_page);
for _ in 0..values_per_page {
let def_level = rng.gen_range(0..max_def_level + 1);
let rep_level = rng.gen_range(0..max_rep_level + 1);
if def_level == max_def_level {
let len = rng.gen_range(1..str_base.len());
let slice = &str_base[..len];
values.push(ByteArray::from(slice));
all_values.push(Some(slice.to_string()));
} else {
all_values.push(None)
}
rep_levels.push(rep_level);
def_levels.push(def_level)
}
let range = i * values_per_page..(i + 1) * values_per_page;
let mut pb =
DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]);
pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
pb.add_values::<ByteArrayType>(Encoding::PLAIN, values.as_slice());
let data_page = pb.consume();
pages.push(vec![data_page]);
}
let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages);
let converter = StringArrayConverter::new();
let mut array_reader =
ArrowArrayReader::try_new(page_iterator, column_desc, converter, None)
.unwrap();
let mut accu_len: usize = 0;
let array = array_reader.next_batch(values_per_page / 2).unwrap();
assert_eq!(array.len(), values_per_page / 2);
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
array_reader.get_def_levels()
);
assert_eq!(
Some(&rep_levels[accu_len..(accu_len + array.len())]),
array_reader.get_rep_levels()
);
accu_len += array.len();
// Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk,
// and the last values_per_page/2 ones are from the second column chunk
let array = array_reader.next_batch(values_per_page).unwrap();
assert_eq!(array.len(), values_per_page);
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
array_reader.get_def_levels()
);
assert_eq!(
Some(&rep_levels[accu_len..(accu_len + array.len())]),
array_reader.get_rep_levels()
);
let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
for i in 0..array.len() {
if array.is_valid(i) {
assert_eq!(
all_values[i + accu_len].as_ref().unwrap().as_str(),
strings.value(i)
)
} else {
assert_eq!(all_values[i + accu_len], None)
}
}
accu_len += array.len();
// Try to read values_per_page values, however there are only values_per_page/2 values
let array = array_reader.next_batch(values_per_page).unwrap();
assert_eq!(array.len(), values_per_page / 2);
assert_eq!(
Some(&def_levels[accu_len..(accu_len + array.len())]),
array_reader.get_def_levels()
);
assert_eq!(
Some(&rep_levels[accu_len..(accu_len + array.len())]),
array_reader.get_rep_levels()
);
}
}