Implement faster arrow array reader (#384)
* implement ArrowArrayReader
* change StringArrayConverter to use push_unchecked for offsets
* add ASF license header to new files
* fix clippy issues
* cleanup arrow_array_reader benches
* cleanup arrow_array_reader
* change util module to limit public exports from test_common sub-module
* fix rustfmt issues
diff --git a/arrow/src/array/data.rs b/arrow/src/array/data.rs
index 172bdaa..64767e3 100644
--- a/arrow/src/array/data.rs
+++ b/arrow/src/array/data.rs
@@ -506,6 +506,11 @@
self
}
+ pub fn null_count(mut self, null_count: usize) -> Self {
+ self.null_count = Some(null_count);
+ self
+ }
+
pub fn null_bit_buffer(mut self, buf: Buffer) -> Self {
self.null_bit_buffer = Some(buf);
self
diff --git a/arrow/src/array/transform/mod.rs b/arrow/src/array/transform/mod.rs
index 3fae220..0194b93 100644
--- a/arrow/src/array/transform/mod.rs
+++ b/arrow/src/array/transform/mod.rs
@@ -25,7 +25,7 @@
use super::{
data::{into_buffers, new_buffers},
- ArrayData,
+ ArrayData, ArrayDataBuilder,
};
use crate::array::StringOffsetSizeTrait;
@@ -63,7 +63,7 @@
}
impl<'a> _MutableArrayData<'a> {
- fn freeze(self, dictionary: Option<ArrayData>) -> ArrayData {
+ fn freeze(self, dictionary: Option<ArrayData>) -> ArrayDataBuilder {
let buffers = into_buffers(&self.data_type, self.buffer1, self.buffer2);
let child_data = match self.data_type {
@@ -76,19 +76,19 @@
child_data
}
};
- ArrayData::new(
- self.data_type,
- self.len,
- Some(self.null_count),
- if self.null_count > 0 {
- Some(self.null_buffer.into())
- } else {
- None
- },
- 0,
- buffers,
- child_data,
- )
+
+ let mut array_data_builder = ArrayDataBuilder::new(self.data_type)
+ .offset(0)
+ .len(self.len)
+ .null_count(self.null_count)
+ .buffers(buffers)
+ .child_data(child_data);
+ if self.null_count > 0 {
+ array_data_builder =
+ array_data_builder.null_bit_buffer(self.null_buffer.into());
+ }
+
+ array_data_builder
}
}
@@ -552,8 +552,13 @@
.map(|array| build_extend_null_bits(array, use_nulls))
.collect();
- let null_bytes = bit_util::ceil(array_capacity, 8);
- let null_buffer = MutableBuffer::from_len_zeroed(null_bytes);
+ let null_buffer = if use_nulls {
+ let null_bytes = bit_util::ceil(array_capacity, 8);
+ MutableBuffer::from_len_zeroed(null_bytes)
+ } else {
+ // create 0 capacity mutable buffer with the intention that it won't be used
+ MutableBuffer::with_capacity(0)
+ };
let extend_values = match &data_type {
DataType::Dictionary(_, _) => {
@@ -605,13 +610,40 @@
/// Extends this [MutableArrayData] with null elements, disregarding the bound arrays
pub fn extend_nulls(&mut self, len: usize) {
+ // TODO: null_buffer should probably be extended here as well
+ // otherwise is_valid() could later panic
+ // add test to confirm
self.data.null_count += len;
(self.extend_nulls)(&mut self.data, len);
self.data.len += len;
}
+ /// Returns the current length
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.data.len
+ }
+
+ /// Returns true if len is 0
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.data.len == 0
+ }
+
+ /// Returns the current null count
+ #[inline]
+ pub fn null_count(&self) -> usize {
+ self.data.null_count
+ }
+
/// Creates a [ArrayData] from the pushed regions up to this point, consuming `self`.
pub fn freeze(self) -> ArrayData {
+ self.data.freeze(self.dictionary).build()
+ }
+
+ /// Creates a [ArrayDataBuilder] from the pushed regions up to this point, consuming `self`.
+ /// This is useful for extending the default behavior of MutableArrayData.
+ pub fn into_builder(self) -> ArrayDataBuilder {
self.data.freeze(self.dictionary)
}
}
diff --git a/arrow/src/buffer/mutable.rs b/arrow/src/buffer/mutable.rs
index 35b123d..7d336e0 100644
--- a/arrow/src/buffer/mutable.rs
+++ b/arrow/src/buffer/mutable.rs
@@ -327,10 +327,10 @@
}
/// Extends the buffer with a new item, without checking for sufficient capacity
- /// Safety
+ /// # Safety
/// Caller must ensure that the capacity()-len()>=size_of<T>()
#[inline]
- unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
+ pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
let additional = std::mem::size_of::<T>();
let dst = self.data.as_ptr().add(self.len) as *mut T;
std::ptr::write(dst, item);
diff --git a/arrow/src/compute/kernels/filter.rs b/arrow/src/compute/kernels/filter.rs
index b15692e..c5efadd 100644
--- a/arrow/src/compute/kernels/filter.rs
+++ b/arrow/src/compute/kernels/filter.rs
@@ -44,10 +44,10 @@
/// slots of a [BooleanArray] are true. Each interval corresponds to a contiguous region of memory to be
/// "taken" from an array to be filtered.
#[derive(Debug)]
-pub(crate) struct SlicesIterator<'a> {
+pub struct SlicesIterator<'a> {
iter: Enumerate<BitChunkIterator<'a>>,
state: State,
- filter_count: usize,
+ filter: &'a BooleanArray,
remainder_mask: u64,
remainder_len: usize,
chunk_len: usize,
@@ -59,19 +59,14 @@
}
impl<'a> SlicesIterator<'a> {
- pub(crate) fn new(filter: &'a BooleanArray) -> Self {
+ pub fn new(filter: &'a BooleanArray) -> Self {
let values = &filter.data_ref().buffers()[0];
-
- // this operation is performed before iteration
- // because it is fast and allows reserving all the needed memory
- let filter_count = values.count_set_bits_offset(filter.offset(), filter.len());
-
let chunks = values.bit_chunks(filter.offset(), filter.len());
Self {
iter: chunks.iter().enumerate(),
state: State::Chunks,
- filter_count,
+ filter,
remainder_len: chunks.remainder_len(),
chunk_len: chunks.chunk_len(),
remainder_mask: chunks.remainder_bits(),
@@ -83,6 +78,12 @@
}
}
+ /// Counts the number of set bits in the filter array.
+ fn filter_count(&self) -> usize {
+ let values = self.filter.values();
+ values.count_set_bits_offset(self.filter.offset(), self.filter.len())
+ }
+
#[inline]
fn current_start(&self) -> usize {
self.current_chunk * 64 + self.current_bit
@@ -193,7 +194,7 @@
/// Therefore, it is considered undefined behavior to pass `filter` with null values.
pub fn build_filter(filter: &BooleanArray) -> Result<Filter> {
let iter = SlicesIterator::new(filter);
- let filter_count = iter.filter_count;
+ let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();
Ok(Box::new(move |array: &ArrayData| {
@@ -253,7 +254,8 @@
}
let iter = SlicesIterator::new(predicate);
- match iter.filter_count {
+ let filter_count = iter.filter_count();
+ match filter_count {
0 => {
// return empty
Ok(new_empty_array(array.data_type()))
@@ -266,7 +268,7 @@
_ => {
// actually filter
let mut mutable =
- MutableArrayData::new(vec![array.data_ref()], false, iter.filter_count);
+ MutableArrayData::new(vec![array.data_ref()], false, filter_count);
iter.for_each(|(start, end)| mutable.extend(0, start, end));
let data = mutable.freeze();
Ok(make_array(data))
@@ -599,7 +601,7 @@
let filter = BooleanArray::from(filter_values);
let iter = SlicesIterator::new(&filter);
- let filter_count = iter.filter_count;
+ let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();
assert_eq!(chunks, vec![(1, 2)]);
@@ -612,7 +614,7 @@
let filter = BooleanArray::from(filter_values);
let iter = SlicesIterator::new(&filter);
- let filter_count = iter.filter_count;
+ let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();
assert_eq!(chunks, vec![(0, 1), (2, 64)]);
@@ -625,7 +627,7 @@
let filter = BooleanArray::from(filter_values);
let iter = SlicesIterator::new(&filter);
- let filter_count = iter.filter_count;
+ let filter_count = iter.filter_count();
let chunks = iter.collect::<Vec<_>>();
assert_eq!(chunks, vec![(1, 62), (63, 124), (125, 130)]);
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 1e54047..d66ee23 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -45,6 +45,7 @@
base64 = { version = "0.13", optional = true }
clap = { version = "2.33.3", optional = true }
serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
+rand = "0.8"
[dev-dependencies]
criterion = "0.3"
@@ -76,3 +77,7 @@
[[bench]]
name = "arrow_writer"
harness = false
+
+[[bench]]
+name = "arrow_array_reader"
+harness = false
\ No newline at end of file
diff --git a/parquet/benches/arrow_array_reader.rs b/parquet/benches/arrow_array_reader.rs
new file mode 100644
index 0000000..6e87512
--- /dev/null
+++ b/parquet/benches/arrow_array_reader.rs
@@ -0,0 +1,766 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use criterion::{criterion_group, criterion_main, Criterion};
+use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
+use parquet::{
+ arrow::array_reader::ArrayReader,
+ basic::Encoding,
+ column::page::PageIterator,
+ data_type::{ByteArrayType, Int32Type},
+ schema::types::{ColumnDescPtr, SchemaDescPtr},
+};
+use std::{collections::VecDeque, sync::Arc};
+
+fn build_test_schema() -> SchemaDescPtr {
+ use parquet::schema::{parser::parse_message_type, types::SchemaDescriptor};
+ let message_type = "
+ message test_schema {
+ REQUIRED INT32 mandatory_int32_leaf;
+ REPEATED Group test_mid_int32 {
+ OPTIONAL INT32 optional_int32_leaf;
+ }
+ REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8);
+ REPEATED Group test_mid_string {
+ OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8);
+ }
+ }
+ ";
+ parse_message_type(message_type)
+ .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+ .unwrap()
+}
+
+// test data params
+const NUM_ROW_GROUPS: usize = 1;
+const PAGES_PER_GROUP: usize = 2;
+const VALUES_PER_PAGE: usize = 10_000;
+const BATCH_SIZE: usize = 8192;
+
+use rand::{rngs::StdRng, Rng, SeedableRng};
+
+pub fn seedable_rng() -> StdRng {
+ StdRng::seed_from_u64(42)
+}
+
+fn build_plain_encoded_int32_page_iterator(
+ schema: SchemaDescPtr,
+ column_desc: ColumnDescPtr,
+ null_density: f32,
+) -> impl PageIterator + Clone {
+ let max_def_level = column_desc.max_def_level();
+ let max_rep_level = column_desc.max_rep_level();
+ let rep_levels = vec![0; VALUES_PER_PAGE];
+ let mut rng = seedable_rng();
+ let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+ let mut int32_value = 0;
+ for _i in 0..NUM_ROW_GROUPS {
+ let mut column_chunk_pages = Vec::new();
+ for _j in 0..PAGES_PER_GROUP {
+ // generate page
+ let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+ let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+ for _k in 0..VALUES_PER_PAGE {
+ let def_level = if rng.gen::<f32>() < null_density {
+ max_def_level - 1
+ } else {
+ max_def_level
+ };
+ if def_level == max_def_level {
+ int32_value += 1;
+ values.push(int32_value);
+ }
+ def_levels.push(def_level);
+ }
+ let mut page_builder =
+ DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+ page_builder.add_rep_levels(max_rep_level, &rep_levels);
+ page_builder.add_def_levels(max_def_level, &def_levels);
+ page_builder.add_values::<Int32Type>(Encoding::PLAIN, &values);
+ column_chunk_pages.push(page_builder.consume());
+ }
+ pages.push(column_chunk_pages);
+ }
+
+ InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_int32_page_iterator(
+ schema: SchemaDescPtr,
+ column_desc: ColumnDescPtr,
+ null_density: f32,
+) -> impl PageIterator + Clone {
+ use parquet::encoding::{DictEncoder, Encoder};
+ let max_def_level = column_desc.max_def_level();
+ let max_rep_level = column_desc.max_rep_level();
+ let rep_levels = vec![0; VALUES_PER_PAGE];
+ // generate 1% unique values
+ const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+ let unique_values = (0..NUM_UNIQUE_VALUES)
+ .map(|x| (x + 1) as i32)
+ .collect::<Vec<_>>();
+ let mut rng = seedable_rng();
+ let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+ for _i in 0..NUM_ROW_GROUPS {
+ let mut column_chunk_pages = VecDeque::new();
+ let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+ let mut dict_encoder =
+ DictEncoder::<Int32Type>::new(column_desc.clone(), mem_tracker);
+ // add data pages
+ for _j in 0..PAGES_PER_GROUP {
+ // generate page
+ let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+ let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+ for _k in 0..VALUES_PER_PAGE {
+ let def_level = if rng.gen::<f32>() < null_density {
+ max_def_level - 1
+ } else {
+ max_def_level
+ };
+ if def_level == max_def_level {
+ // select random value from list of unique values
+ let int32_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)];
+ values.push(int32_value);
+ }
+ def_levels.push(def_level);
+ }
+ let mut page_builder =
+ DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+ page_builder.add_rep_levels(max_rep_level, &rep_levels);
+ page_builder.add_def_levels(max_def_level, &def_levels);
+ let _ = dict_encoder.put(&values);
+ let indices = dict_encoder
+ .write_indices()
+ .expect("write_indices() should be OK");
+ page_builder.add_indices(indices);
+ column_chunk_pages.push_back(page_builder.consume());
+ }
+ // add dictionary page
+ let dict = dict_encoder
+ .write_dict()
+ .expect("write_dict() should be OK");
+ let dict_page = parquet::column::page::Page::DictionaryPage {
+ buf: dict,
+ num_values: dict_encoder.num_entries() as u32,
+ encoding: Encoding::RLE_DICTIONARY,
+ is_sorted: false,
+ };
+ column_chunk_pages.push_front(dict_page);
+ pages.push(column_chunk_pages.into());
+ }
+
+ InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_plain_encoded_string_page_iterator(
+ schema: SchemaDescPtr,
+ column_desc: ColumnDescPtr,
+ null_density: f32,
+) -> impl PageIterator + Clone {
+ let max_def_level = column_desc.max_def_level();
+ let max_rep_level = column_desc.max_rep_level();
+ let rep_levels = vec![0; VALUES_PER_PAGE];
+ let mut rng = seedable_rng();
+ let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+ for i in 0..NUM_ROW_GROUPS {
+ let mut column_chunk_pages = Vec::new();
+ for j in 0..PAGES_PER_GROUP {
+ // generate page
+ let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+ let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+ for k in 0..VALUES_PER_PAGE {
+ let def_level = if rng.gen::<f32>() < null_density {
+ max_def_level - 1
+ } else {
+ max_def_level
+ };
+ if def_level == max_def_level {
+ let string_value =
+ format!("Test value {}, row group: {}, page: {}", k, i, j);
+ values
+ .push(parquet::data_type::ByteArray::from(string_value.as_str()));
+ }
+ def_levels.push(def_level);
+ }
+ let mut page_builder =
+ DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+ page_builder.add_rep_levels(max_rep_level, &rep_levels);
+ page_builder.add_def_levels(max_def_level, &def_levels);
+ page_builder.add_values::<ByteArrayType>(Encoding::PLAIN, &values);
+ column_chunk_pages.push(page_builder.consume());
+ }
+ pages.push(column_chunk_pages);
+ }
+
+ InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_string_page_iterator(
+ schema: SchemaDescPtr,
+ column_desc: ColumnDescPtr,
+ null_density: f32,
+) -> impl PageIterator + Clone {
+ use parquet::encoding::{DictEncoder, Encoder};
+ let max_def_level = column_desc.max_def_level();
+ let max_rep_level = column_desc.max_rep_level();
+ let rep_levels = vec![0; VALUES_PER_PAGE];
+ // generate 1% unique values
+ const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+ let unique_values = (0..NUM_UNIQUE_VALUES)
+ .map(|x| format!("Dictionary value {}", x))
+ .collect::<Vec<_>>();
+ let mut rng = seedable_rng();
+ let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+ for _i in 0..NUM_ROW_GROUPS {
+ let mut column_chunk_pages = VecDeque::new();
+ let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+ let mut dict_encoder =
+ DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
+ // add data pages
+ for _j in 0..PAGES_PER_GROUP {
+ // generate page
+ let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+ let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+ for _k in 0..VALUES_PER_PAGE {
+ let def_level = if rng.gen::<f32>() < null_density {
+ max_def_level - 1
+ } else {
+ max_def_level
+ };
+ if def_level == max_def_level {
+ // select random value from list of unique values
+ let string_value =
+ unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)].as_str();
+ values.push(parquet::data_type::ByteArray::from(string_value));
+ }
+ def_levels.push(def_level);
+ }
+ let mut page_builder =
+ DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+ page_builder.add_rep_levels(max_rep_level, &rep_levels);
+ page_builder.add_def_levels(max_def_level, &def_levels);
+ let _ = dict_encoder.put(&values);
+ let indices = dict_encoder
+ .write_indices()
+ .expect("write_indices() should be OK");
+ page_builder.add_indices(indices);
+ column_chunk_pages.push_back(page_builder.consume());
+ }
+ // add dictionary page
+ let dict = dict_encoder
+ .write_dict()
+ .expect("write_dict() should be OK");
+ let dict_page = parquet::column::page::Page::DictionaryPage {
+ buf: dict,
+ num_values: dict_encoder.num_entries() as u32,
+ encoding: Encoding::RLE_DICTIONARY,
+ is_sorted: false,
+ };
+ column_chunk_pages.push_front(dict_page);
+ pages.push(column_chunk_pages.into());
+ }
+
+ InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
+ // test procedure: read data in batches of 8192 until no more data
+ let mut total_count = 0;
+ loop {
+ let array = array_reader.next_batch(BATCH_SIZE);
+ let array_len = array.unwrap().len();
+ total_count += array_len;
+ if array_len < BATCH_SIZE {
+ break;
+ }
+ }
+ total_count
+}
+
+fn create_int32_arrow_array_reader(
+ page_iterator: impl PageIterator + 'static,
+ column_desc: ColumnDescPtr,
+) -> impl ArrayReader {
+ use parquet::arrow::arrow_array_reader::{ArrowArrayReader, PrimitiveArrayConverter};
+ let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+ ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap()
+}
+
+fn create_int32_primitive_array_reader(
+ page_iterator: impl PageIterator + 'static,
+ column_desc: ColumnDescPtr,
+) -> impl ArrayReader {
+ use parquet::arrow::array_reader::PrimitiveArrayReader;
+ PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
+ .unwrap()
+}
+
+fn create_string_arrow_array_reader(
+ page_iterator: impl PageIterator + 'static,
+ column_desc: ColumnDescPtr,
+) -> impl ArrayReader {
+ use parquet::arrow::arrow_array_reader::{ArrowArrayReader, StringArrayConverter};
+ let converter = StringArrayConverter::new();
+ ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap()
+}
+
+fn create_string_complex_array_reader(
+ page_iterator: impl PageIterator + 'static,
+ column_desc: ColumnDescPtr,
+) -> impl ArrayReader {
+ use parquet::arrow::array_reader::ComplexObjectArrayReader;
+ use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
+ let converter = Utf8Converter::new(Utf8ArrayConverter {});
+ ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, Utf8Converter>::new(
+ Box::new(page_iterator),
+ column_desc,
+ converter,
+ None,
+ )
+ .unwrap()
+}
+
+fn add_benches(c: &mut Criterion) {
+ const EXPECTED_VALUE_COUNT: usize =
+ NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE;
+ let mut group = c.benchmark_group("arrow_array_reader");
+
+ let mut count: usize = 0;
+
+ let schema = build_test_schema();
+ let mandatory_int32_column_desc = schema.column(0);
+ let optional_int32_column_desc = schema.column(1);
+ let mandatory_string_column_desc = schema.column(2);
+ // println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
+ let optional_string_column_desc = schema.column(3);
+ // println!("optional_string_column_desc: {:?}", optional_string_column_desc);
+
+ // primitive / int32 benchmarks
+ // =============================
+
+ // int32, plain encoded, no NULLs
+ let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(
+ schema.clone(),
+ mandatory_int32_column_desc.clone(),
+ 0.0,
+ );
+ group.bench_function(
+ "read Int32Array, plain encoded, mandatory, no NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_int32_primitive_array_reader(
+ plain_int32_no_null_data.clone(),
+ mandatory_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.bench_function(
+ "read Int32Array, plain encoded, mandatory, no NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_int32_arrow_array_reader(
+ plain_int32_no_null_data.clone(),
+ mandatory_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(
+ schema.clone(),
+ optional_int32_column_desc.clone(),
+ 0.0,
+ );
+ group.bench_function(
+ "read Int32Array, plain encoded, optional, no NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_int32_primitive_array_reader(
+ plain_int32_no_null_data.clone(),
+ optional_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.bench_function(
+ "read Int32Array, plain encoded, optional, no NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_int32_arrow_array_reader(
+ plain_int32_no_null_data.clone(),
+ optional_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ // int32, plain encoded, half NULLs
+ let plain_int32_half_null_data = build_plain_encoded_int32_page_iterator(
+ schema.clone(),
+ optional_int32_column_desc.clone(),
+ 0.5,
+ );
+ group.bench_function(
+ "read Int32Array, plain encoded, optional, half NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_int32_primitive_array_reader(
+ plain_int32_half_null_data.clone(),
+ optional_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.bench_function(
+ "read Int32Array, plain encoded, optional, half NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_int32_arrow_array_reader(
+ plain_int32_half_null_data.clone(),
+ optional_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ // int32, dictionary encoded, no NULLs
+ let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator(
+ schema.clone(),
+ mandatory_int32_column_desc.clone(),
+ 0.0,
+ );
+ group.bench_function(
+ "read Int32Array, dictionary encoded, mandatory, no NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_int32_primitive_array_reader(
+ dictionary_int32_no_null_data.clone(),
+ mandatory_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.bench_function(
+ "read Int32Array, dictionary encoded, mandatory, no NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_int32_arrow_array_reader(
+ dictionary_int32_no_null_data.clone(),
+ mandatory_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator(
+ schema.clone(),
+ optional_int32_column_desc.clone(),
+ 0.0,
+ );
+ group.bench_function(
+ "read Int32Array, dictionary encoded, optional, no NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_int32_primitive_array_reader(
+ dictionary_int32_no_null_data.clone(),
+ optional_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.bench_function(
+ "read Int32Array, dictionary encoded, optional, no NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_int32_arrow_array_reader(
+ dictionary_int32_no_null_data.clone(),
+ optional_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ // int32, dictionary encoded, half NULLs
+ let dictionary_int32_half_null_data = build_dictionary_encoded_int32_page_iterator(
+ schema.clone(),
+ optional_int32_column_desc.clone(),
+ 0.5,
+ );
+ group.bench_function(
+ "read Int32Array, dictionary encoded, optional, half NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_int32_primitive_array_reader(
+ dictionary_int32_half_null_data.clone(),
+ optional_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.bench_function(
+ "read Int32Array, dictionary encoded, optional, half NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_int32_arrow_array_reader(
+ dictionary_int32_half_null_data.clone(),
+ optional_int32_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ // string benchmarks
+ //==============================
+
+ // string, plain encoded, no NULLs
+ let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
+ schema.clone(),
+ mandatory_string_column_desc.clone(),
+ 0.0,
+ );
+ group.bench_function(
+ "read StringArray, plain encoded, mandatory, no NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_complex_array_reader(
+ plain_string_no_null_data.clone(),
+ mandatory_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.bench_function(
+ "read StringArray, plain encoded, mandatory, no NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_arrow_array_reader(
+ plain_string_no_null_data.clone(),
+ mandatory_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
+ schema.clone(),
+ optional_string_column_desc.clone(),
+ 0.0,
+ );
+ group.bench_function(
+ "read StringArray, plain encoded, optional, no NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_complex_array_reader(
+ plain_string_no_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.bench_function(
+ "read StringArray, plain encoded, optional, no NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_arrow_array_reader(
+ plain_string_no_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ // string, plain encoded, half NULLs
+ let plain_string_half_null_data = build_plain_encoded_string_page_iterator(
+ schema.clone(),
+ optional_string_column_desc.clone(),
+ 0.5,
+ );
+ group.bench_function(
+ "read StringArray, plain encoded, optional, half NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_complex_array_reader(
+ plain_string_half_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.bench_function(
+ "read StringArray, plain encoded, optional, half NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_arrow_array_reader(
+ plain_string_half_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ // string, dictionary encoded, no NULLs
+ let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator(
+ schema.clone(),
+ mandatory_string_column_desc.clone(),
+ 0.0,
+ );
+ group.bench_function(
+ "read StringArray, dictionary encoded, mandatory, no NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_complex_array_reader(
+ dictionary_string_no_null_data.clone(),
+ mandatory_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.bench_function(
+ "read StringArray, dictionary encoded, mandatory, no NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_arrow_array_reader(
+ dictionary_string_no_null_data.clone(),
+ mandatory_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator(
+ schema.clone(),
+ optional_string_column_desc.clone(),
+ 0.0,
+ );
+ group.bench_function(
+ "read StringArray, dictionary encoded, optional, no NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_complex_array_reader(
+ dictionary_string_no_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.bench_function(
+ "read StringArray, dictionary encoded, optional, no NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_arrow_array_reader(
+ dictionary_string_no_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ // string, dictionary encoded, half NULLs
+ let dictionary_string_half_null_data = build_dictionary_encoded_string_page_iterator(
+ schema,
+ optional_string_column_desc.clone(),
+ 0.5,
+ );
+ group.bench_function(
+ "read StringArray, dictionary encoded, optional, half NULLs - old",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_complex_array_reader(
+ dictionary_string_half_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.bench_function(
+ "read StringArray, dictionary encoded, optional, half NULLs - new",
+ |b| {
+ b.iter(|| {
+ let array_reader = create_string_arrow_array_reader(
+ dictionary_string_half_null_data.clone(),
+ optional_string_column_desc.clone(),
+ );
+ count = bench_array_reader(array_reader);
+ })
+ },
+ );
+ assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+ group.finish();
+}
+
+criterion_group!(benches, add_benches);
+criterion_main!(benches);
diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index f54e446..bd57cf3 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -60,7 +60,7 @@
Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter,
- LargeUtf8ArrayConverter, LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter,
+ LargeUtf8ArrayConverter, LargeUtf8Converter,
};
use crate::arrow::record_reader::RecordReader;
use crate::arrow::schema::parquet_to_arrow_field;
@@ -570,7 +570,7 @@
T: DataType,
C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
{
- fn new(
+ pub fn new(
pages: Box<dyn PageIterator>,
column_desc: ColumnDescPtr,
converter: C,
@@ -1499,12 +1499,12 @@
arrow_type,
)?))
} else {
- let converter = Utf8Converter::new(Utf8ArrayConverter {});
- Ok(Box::new(ComplexObjectArrayReader::<
- ByteArrayType,
- Utf8Converter,
- >::new(
- page_iterator,
+ use crate::arrow::arrow_array_reader::{
+ ArrowArrayReader, StringArrayConverter,
+ };
+ let converter = StringArrayConverter::new();
+ Ok(Box::new(ArrowArrayReader::try_new(
+ *page_iterator,
column_desc,
converter,
arrow_type,
@@ -1728,7 +1728,7 @@
#[cfg(test)]
mod tests {
use super::*;
- use crate::arrow::converter::Utf8Converter;
+ use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
use crate::arrow::schema::parquet_to_arrow_schema;
use crate::basic::{Encoding, Type as PhysicalType};
use crate::column::page::{Page, PageReader};
diff --git a/parquet/src/arrow/arrow_array_reader.rs b/parquet/src/arrow/arrow_array_reader.rs
new file mode 100644
index 0000000..c06d872
--- /dev/null
+++ b/parquet/src/arrow/arrow_array_reader.rs
@@ -0,0 +1,1562 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::array_reader::ArrayReader;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::basic::Encoding;
+use crate::errors::{ParquetError, Result};
+use crate::{
+ column::page::{Page, PageIterator},
+ memory::ByteBufferPtr,
+ schema::types::{ColumnDescPtr, ColumnDescriptor},
+};
+use arrow::{
+ array::{ArrayRef, Int16Array},
+ buffer::MutableBuffer,
+ datatypes::{DataType as ArrowType, ToByteSlice},
+};
+use std::{any::Any, collections::VecDeque, marker::PhantomData};
+use std::{cell::RefCell, rc::Rc};
+
+struct UnzipIter<Source, Target, State> {
+ shared_state: Rc<RefCell<State>>,
+ select_item_buffer: fn(&mut State) -> &mut VecDeque<Target>,
+ consume_source_item: fn(source_item: Source, state: &mut State) -> Target,
+}
+
+impl<Source, Target, State> UnzipIter<Source, Target, State> {
+ fn new(
+ shared_state: Rc<RefCell<State>>,
+ item_buffer_selector: fn(&mut State) -> &mut VecDeque<Target>,
+ source_item_consumer: fn(source_item: Source, state: &mut State) -> Target,
+ ) -> Self {
+ Self {
+ shared_state,
+ select_item_buffer: item_buffer_selector,
+ consume_source_item: source_item_consumer,
+ }
+ }
+}
+
+trait UnzipIterState<T> {
+ type SourceIter: Iterator<Item = T>;
+ fn source_iter(&mut self) -> &mut Self::SourceIter;
+}
+
+impl<Source, Target, State: UnzipIterState<Source>> Iterator
+ for UnzipIter<Source, Target, State>
+{
+ type Item = Target;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ let mut inner = self.shared_state.borrow_mut();
+ // try to get one from the stored data
+ (self.select_item_buffer)(&mut *inner)
+ .pop_front()
+ .or_else(||
+ // nothing stored, we need a new element.
+ inner.source_iter().next().map(|s| {
+ (self.consume_source_item)(s, &mut inner)
+ }))
+ }
+}
+
+struct PageBufferUnzipIterState<V, L, It> {
+ iter: It,
+ value_iter_buffer: VecDeque<V>,
+ def_level_iter_buffer: VecDeque<L>,
+ rep_level_iter_buffer: VecDeque<L>,
+}
+
+impl<V, L, It: Iterator<Item = (V, L, L)>> UnzipIterState<(V, L, L)>
+ for PageBufferUnzipIterState<V, L, It>
+{
+ type SourceIter = It;
+
+ #[inline]
+ fn source_iter(&mut self) -> &mut Self::SourceIter {
+ &mut self.iter
+ }
+}
+
+type ValueUnzipIter<V, L, It> =
+ UnzipIter<(V, L, L), V, PageBufferUnzipIterState<V, L, It>>;
+type LevelUnzipIter<V, L, It> =
+ UnzipIter<(V, L, L), L, PageBufferUnzipIterState<V, L, It>>;
+type PageUnzipResult<V, L, It> = (
+ ValueUnzipIter<V, L, It>,
+ LevelUnzipIter<V, L, It>,
+ LevelUnzipIter<V, L, It>,
+);
+
+fn unzip_iter<V, L, It: Iterator<Item = (V, L, L)>>(it: It) -> PageUnzipResult<V, L, It> {
+ let shared_data = Rc::new(RefCell::new(PageBufferUnzipIterState {
+ iter: it,
+ value_iter_buffer: VecDeque::new(),
+ def_level_iter_buffer: VecDeque::new(),
+ rep_level_iter_buffer: VecDeque::new(),
+ }));
+
+ let value_iter = UnzipIter::new(
+ shared_data.clone(),
+ |state| &mut state.value_iter_buffer,
+ |(v, d, r), state| {
+ state.def_level_iter_buffer.push_back(d);
+ state.rep_level_iter_buffer.push_back(r);
+ v
+ },
+ );
+
+ let def_level_iter = UnzipIter::new(
+ shared_data.clone(),
+ |state| &mut state.def_level_iter_buffer,
+ |(v, d, r), state| {
+ state.value_iter_buffer.push_back(v);
+ state.rep_level_iter_buffer.push_back(r);
+ d
+ },
+ );
+
+ let rep_level_iter = UnzipIter::new(
+ shared_data,
+ |state| &mut state.rep_level_iter_buffer,
+ |(v, d, r), state| {
+ state.value_iter_buffer.push_back(v);
+ state.def_level_iter_buffer.push_back(d);
+ r
+ },
+ );
+
+ (value_iter, def_level_iter, rep_level_iter)
+}
+
+pub trait ArrayConverter {
+ fn convert_value_bytes(
+ &self,
+ value_decoder: &mut impl ValueDecoder,
+ num_values: usize,
+ ) -> Result<arrow::array::ArrayData>;
+}
+
+pub struct ArrowArrayReader<'a, C: ArrayConverter + 'a> {
+ column_desc: ColumnDescPtr,
+ data_type: ArrowType,
+ def_level_decoder: Box<dyn ValueDecoder + 'a>,
+ rep_level_decoder: Box<dyn ValueDecoder + 'a>,
+ value_decoder: Box<dyn ValueDecoder + 'a>,
+ last_def_levels: Option<Int16Array>,
+ last_rep_levels: Option<Int16Array>,
+ array_converter: C,
+}
+
+pub(crate) struct ColumnChunkContext {
+ dictionary_values: Option<Vec<ByteBufferPtr>>,
+}
+
+impl ColumnChunkContext {
+ fn new() -> Self {
+ Self {
+ dictionary_values: None,
+ }
+ }
+
+ fn set_dictionary(&mut self, dictionary_values: Vec<ByteBufferPtr>) {
+ self.dictionary_values = Some(dictionary_values);
+ }
+}
+
+type PageDecoderTuple = (
+ Box<dyn ValueDecoder>,
+ Box<dyn ValueDecoder>,
+ Box<dyn ValueDecoder>,
+);
+
+impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
+ pub fn try_new<P: PageIterator + 'a>(
+ column_chunk_iterator: P,
+ column_desc: ColumnDescPtr,
+ array_converter: C,
+ arrow_type: Option<ArrowType>,
+ ) -> Result<Self> {
+ let data_type = match arrow_type {
+ Some(t) => t,
+ None => parquet_to_arrow_field(column_desc.as_ref())?
+ .data_type()
+ .clone(),
+ };
+ type PageIteratorItem = Result<(Page, Rc<RefCell<ColumnChunkContext>>)>;
+ let page_iter = column_chunk_iterator
+ // build iterator of pages across column chunks
+ .flat_map(|x| -> Box<dyn Iterator<Item = PageIteratorItem>> {
+ // attach column chunk context
+ let context = Rc::new(RefCell::new(ColumnChunkContext::new()));
+ match x {
+ Ok(page_reader) => Box::new(
+ page_reader.map(move |pr| pr.map(|p| (p, context.clone()))),
+ ),
+ // errors from reading column chunks / row groups are propagated to page level
+ Err(e) => Box::new(std::iter::once(Err(e))),
+ }
+ });
+ // capture a clone of column_desc in closure so that it can outlive current function
+ let map_page_fn_factory = |column_desc: ColumnDescPtr| {
+ move |x: Result<(Page, Rc<RefCell<ColumnChunkContext>>)>| {
+ x.and_then(|(page, context)| {
+ Self::map_page(page, context, column_desc.as_ref())
+ })
+ }
+ };
+ let map_page_fn = map_page_fn_factory(column_desc.clone());
+ // map page iterator into tuple of buffer iterators for (values, def levels, rep levels)
+ // errors from lower levels are surfaced through the value decoder iterator
+ let decoder_iter = page_iter.map(map_page_fn).map(|x| match x {
+ Ok(iter_tuple) => iter_tuple,
+ // errors from reading pages are propagated to decoder iterator level
+ Err(e) => Self::map_page_error(e),
+ });
+ // split tuple iterator into separate iterators for (values, def levels, rep levels)
+ let (value_iter, def_level_iter, rep_level_iter) = unzip_iter(decoder_iter);
+
+ Ok(Self {
+ column_desc,
+ data_type,
+ def_level_decoder: Box::new(CompositeValueDecoder::new(def_level_iter)),
+ rep_level_decoder: Box::new(CompositeValueDecoder::new(rep_level_iter)),
+ value_decoder: Box::new(CompositeValueDecoder::new(value_iter)),
+ last_def_levels: None,
+ last_rep_levels: None,
+ array_converter,
+ })
+ }
+
+ #[inline]
+ fn def_levels_available(column_desc: &ColumnDescriptor) -> bool {
+ column_desc.max_def_level() > 0
+ }
+
+ #[inline]
+ fn rep_levels_available(column_desc: &ColumnDescriptor) -> bool {
+ column_desc.max_rep_level() > 0
+ }
+
+ fn map_page_error(err: ParquetError) -> PageDecoderTuple {
+ (
+ Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
+ Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
+ Box::new(<dyn ValueDecoder>::once(Err(err))),
+ )
+ }
+
+ // Split Result<Page> into Result<(Iterator<Values>, Iterator<DefLevels>, Iterator<RepLevels>)>
+ // this method could fail, e.g. if the page encoding is not supported
+ fn map_page(
+ page: Page,
+ column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
+ column_desc: &ColumnDescriptor,
+ ) -> Result<PageDecoderTuple> {
+ use crate::encodings::levels::LevelDecoder;
+ match page {
+ Page::DictionaryPage {
+ buf,
+ num_values,
+ encoding,
+ ..
+ } => {
+ let mut column_chunk_context = column_chunk_context.borrow_mut();
+ if column_chunk_context.dictionary_values.is_some() {
+ return Err(general_err!(
+ "Column chunk cannot have more than one dictionary"
+ ));
+ }
+ // create plain decoder for dictionary values
+ let mut dict_decoder = Self::get_dictionary_page_decoder(
+ buf,
+ num_values as usize,
+ encoding,
+ column_desc,
+ )?;
+ // decode and cache dictionary values
+ let dictionary_values = dict_decoder.read_dictionary_values()?;
+ column_chunk_context.set_dictionary(dictionary_values);
+
+ // a dictionary page doesn't return any values
+ Ok((
+ Box::new(<dyn ValueDecoder>::empty()),
+ Box::new(<dyn ValueDecoder>::empty()),
+ Box::new(<dyn ValueDecoder>::empty()),
+ ))
+ }
+ Page::DataPage {
+ buf,
+ num_values,
+ encoding,
+ def_level_encoding,
+ rep_level_encoding,
+ statistics: _,
+ } => {
+ let mut buffer_ptr = buf;
+ // create rep level decoder iterator
+ let rep_level_iter: Box<dyn ValueDecoder> =
+ if Self::rep_levels_available(&column_desc) {
+ let mut rep_decoder = LevelDecoder::v1(
+ rep_level_encoding,
+ column_desc.max_rep_level(),
+ );
+ let rep_level_byte_len =
+ rep_decoder.set_data(num_values as usize, buffer_ptr.all());
+ // advance buffer pointer
+ buffer_ptr = buffer_ptr.start_from(rep_level_byte_len);
+ Box::new(LevelValueDecoder::new(rep_decoder))
+ } else {
+ Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(
+ "rep levels are not available".to_string(),
+ ))))
+ };
+ // create def level decoder iterator
+ let def_level_iter: Box<dyn ValueDecoder> =
+ if Self::def_levels_available(&column_desc) {
+ let mut def_decoder = LevelDecoder::v1(
+ def_level_encoding,
+ column_desc.max_def_level(),
+ );
+ let def_levels_byte_len =
+ def_decoder.set_data(num_values as usize, buffer_ptr.all());
+ // advance buffer pointer
+ buffer_ptr = buffer_ptr.start_from(def_levels_byte_len);
+ Box::new(LevelValueDecoder::new(def_decoder))
+ } else {
+ Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(
+ "def levels are not available".to_string(),
+ ))))
+ };
+ // create value decoder iterator
+ let value_iter = Self::get_value_decoder(
+ buffer_ptr,
+ num_values as usize,
+ encoding,
+ column_desc,
+ column_chunk_context,
+ )?;
+ Ok((value_iter, def_level_iter, rep_level_iter))
+ }
+ Page::DataPageV2 {
+ buf,
+ num_values,
+ encoding,
+ num_nulls: _,
+ num_rows: _,
+ def_levels_byte_len,
+ rep_levels_byte_len,
+ is_compressed: _,
+ statistics: _,
+ } => {
+ let mut offset = 0;
+ // create rep level decoder iterator
+ let rep_level_iter: Box<dyn ValueDecoder> =
+ if Self::rep_levels_available(&column_desc) {
+ let rep_levels_byte_len = rep_levels_byte_len as usize;
+ let mut rep_decoder =
+ LevelDecoder::v2(column_desc.max_rep_level());
+ rep_decoder.set_data_range(
+ num_values as usize,
+ &buf,
+ offset,
+ rep_levels_byte_len,
+ );
+ offset += rep_levels_byte_len;
+ Box::new(LevelValueDecoder::new(rep_decoder))
+ } else {
+ Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(
+ "rep levels are not available".to_string(),
+ ))))
+ };
+ // create def level decoder iterator
+ let def_level_iter: Box<dyn ValueDecoder> =
+ if Self::def_levels_available(&column_desc) {
+ let def_levels_byte_len = def_levels_byte_len as usize;
+ let mut def_decoder =
+ LevelDecoder::v2(column_desc.max_def_level());
+ def_decoder.set_data_range(
+ num_values as usize,
+ &buf,
+ offset,
+ def_levels_byte_len,
+ );
+ offset += def_levels_byte_len;
+ Box::new(LevelValueDecoder::new(def_decoder))
+ } else {
+ Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(
+ "def levels are not available".to_string(),
+ ))))
+ };
+
+ // create value decoder iterator
+ let values_buffer = buf.start_from(offset);
+ let value_iter = Self::get_value_decoder(
+ values_buffer,
+ num_values as usize,
+ encoding,
+ column_desc,
+ column_chunk_context,
+ )?;
+ Ok((value_iter, def_level_iter, rep_level_iter))
+ }
+ }
+ }
+
+ fn get_dictionary_page_decoder(
+ values_buffer: ByteBufferPtr,
+ num_values: usize,
+ mut encoding: Encoding,
+ column_desc: &ColumnDescriptor,
+ ) -> Result<Box<dyn DictionaryValueDecoder>> {
+ if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
+ encoding = Encoding::RLE_DICTIONARY
+ }
+
+ if encoding == Encoding::RLE_DICTIONARY {
+ Ok(
+ Self::get_plain_value_decoder(values_buffer, num_values, column_desc)
+ .into_dictionary_decoder(),
+ )
+ } else {
+ Err(nyi_err!(
+ "Invalid/Unsupported encoding type for dictionary: {}",
+ encoding
+ ))
+ }
+ }
+
+ fn get_value_decoder(
+ values_buffer: ByteBufferPtr,
+ num_values: usize,
+ mut encoding: Encoding,
+ column_desc: &ColumnDescriptor,
+ column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
+ ) -> Result<Box<dyn ValueDecoder>> {
+ if encoding == Encoding::PLAIN_DICTIONARY {
+ encoding = Encoding::RLE_DICTIONARY;
+ }
+
+ match encoding {
+ Encoding::PLAIN => {
+ Ok(
+ Self::get_plain_value_decoder(values_buffer, num_values, column_desc)
+ .into_value_decoder(),
+ )
+ }
+ Encoding::RLE_DICTIONARY => {
+ if column_chunk_context.borrow().dictionary_values.is_some() {
+ let value_bit_len = Self::get_column_physical_bit_len(column_desc);
+ let dictionary_decoder: Box<dyn ValueDecoder> = if value_bit_len == 0
+ {
+ Box::new(VariableLenDictionaryDecoder::new(
+ column_chunk_context,
+ values_buffer,
+ num_values,
+ ))
+ } else {
+ Box::new(FixedLenDictionaryDecoder::new(
+ column_chunk_context,
+ values_buffer,
+ num_values,
+ value_bit_len,
+ ))
+ };
+ Ok(dictionary_decoder)
+ } else {
+ Err(general_err!("Dictionary values have not been initialized."))
+ }
+ }
+ // Encoding::RLE => Box::new(RleValueDecoder::new()),
+ // Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()),
+ // Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()),
+ // Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()),
+ e => return Err(nyi_err!("Encoding {} is not supported", e)),
+ }
+ }
+
+ fn get_column_physical_bit_len(column_desc: &ColumnDescriptor) -> usize {
+ use crate::basic::Type as PhysicalType;
+ // parquet only supports a limited number of physical types
+ // later converters cast to a more specific arrow / logical type if necessary
+ match column_desc.physical_type() {
+ PhysicalType::BOOLEAN => 1,
+ PhysicalType::INT32 | PhysicalType::FLOAT => 32,
+ PhysicalType::INT64 | PhysicalType::DOUBLE => 64,
+ PhysicalType::INT96 => 96,
+ PhysicalType::BYTE_ARRAY => 0,
+ PhysicalType::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize * 8,
+ }
+ }
+
+ fn get_plain_value_decoder(
+ values_buffer: ByteBufferPtr,
+ num_values: usize,
+ column_desc: &ColumnDescriptor,
+ ) -> Box<dyn PlainValueDecoder> {
+ let value_bit_len = Self::get_column_physical_bit_len(column_desc);
+ if value_bit_len == 0 {
+ Box::new(VariableLenPlainDecoder::new(values_buffer, num_values))
+ } else {
+ Box::new(FixedLenPlainDecoder::new(
+ values_buffer,
+ num_values,
+ value_bit_len,
+ ))
+ }
+ }
+
+ fn build_level_array(
+ level_decoder: &mut impl ValueDecoder,
+ batch_size: usize,
+ ) -> Result<Int16Array> {
+ use arrow::datatypes::Int16Type;
+ let level_converter = PrimitiveArrayConverter::<Int16Type>::new();
+ let array_data =
+ level_converter.convert_value_bytes(level_decoder, batch_size)?;
+ Ok(Int16Array::from(array_data))
+ }
+}
+
+impl<C: ArrayConverter> ArrayReader for ArrowArrayReader<'static, C> {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn get_data_type(&self) -> &ArrowType {
+ &self.data_type
+ }
+
+ fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+ if Self::rep_levels_available(&self.column_desc) {
+ // read rep levels if available
+ let rep_level_array =
+ Self::build_level_array(&mut self.rep_level_decoder, batch_size)?;
+ self.last_rep_levels = Some(rep_level_array);
+ }
+
+ // check if def levels are available
+ let (values_to_read, null_bitmap_array) =
+ if !Self::def_levels_available(&self.column_desc) {
+ // if no def levels - just read (up to) batch_size values
+ (batch_size, None)
+ } else {
+ // if def levels are available - they determine how many values will be read
+ // decode def levels, return first error if any
+ let def_level_array =
+ Self::build_level_array(&mut self.def_level_decoder, batch_size)?;
+ let def_level_count = def_level_array.len();
+ // use eq_scalar to efficiently build null bitmap array from def levels
+ let null_bitmap_array = arrow::compute::eq_scalar(
+ &def_level_array,
+ self.column_desc.max_def_level(),
+ )?;
+ self.last_def_levels = Some(def_level_array);
+ // efficiently calculate values to read
+ let values_to_read = null_bitmap_array
+ .values()
+ .count_set_bits_offset(0, def_level_count);
+ let maybe_null_bitmap = if values_to_read != null_bitmap_array.len() {
+ Some(null_bitmap_array)
+ } else {
+ // shortcut if no NULLs
+ None
+ };
+ (values_to_read, maybe_null_bitmap)
+ };
+
+ // read a batch of values
+ // converter only creates a no-null / all value array data
+ let mut value_array_data = self
+ .array_converter
+ .convert_value_bytes(&mut self.value_decoder, values_to_read)?;
+
+ if let Some(null_bitmap_array) = null_bitmap_array {
+ // Only if def levels are available - insert null values efficiently using MutableArrayData.
+ // This will require value bytes to be copied again, but converter requirements are reduced.
+ // With a small number of NULLs, this will only be a few copies of large byte sequences.
+ let actual_batch_size = null_bitmap_array.len();
+ // use_nulls is false, because null_bitmap_array is already calculated and re-used
+ let mut mutable = arrow::array::MutableArrayData::new(
+ vec![&value_array_data],
+ false,
+ actual_batch_size,
+ );
+ // SlicesIterator slices only the true values, NULLs are inserted to fill any gaps
+ arrow::compute::SlicesIterator::new(&null_bitmap_array).for_each(
+ |(start, end)| {
+ // the gap needs to be filled with NULLs
+ if start > mutable.len() {
+ let nulls_to_add = start - mutable.len();
+ mutable.extend_nulls(nulls_to_add);
+ }
+ // fill values, adjust start and end with NULL count so far
+ let nulls_added = mutable.null_count();
+ mutable.extend(0, start - nulls_added, end - nulls_added);
+ },
+ );
+ // any remaining part is NULLs
+ if mutable.len() < actual_batch_size {
+ let nulls_to_add = actual_batch_size - mutable.len();
+ mutable.extend_nulls(nulls_to_add);
+ }
+
+ value_array_data = mutable
+ .into_builder()
+ .null_bit_buffer(null_bitmap_array.values().clone())
+ .build();
+ }
+ let mut array = arrow::array::make_array(value_array_data);
+ if array.data_type() != &self.data_type {
+ // cast array to self.data_type if necessary
+ array = arrow::compute::cast(&array, &self.data_type)?
+ }
+ Ok(array)
+ }
+
+ fn get_def_levels(&self) -> Option<&[i16]> {
+ self.last_def_levels.as_ref().map(|x| x.values())
+ }
+
+ fn get_rep_levels(&self) -> Option<&[i16]> {
+ self.last_rep_levels.as_ref().map(|x| x.values())
+ }
+}
+
+use crate::encodings::rle::RleDecoder;
+
+pub trait ValueDecoder {
+ fn read_value_bytes(
+ &mut self,
+ num_values: usize,
+ read_bytes: &mut dyn FnMut(&[u8], usize),
+ ) -> Result<usize>;
+}
+
+trait DictionaryValueDecoder {
+ fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>>;
+}
+
+trait PlainValueDecoder: ValueDecoder + DictionaryValueDecoder {
+ fn into_value_decoder(self: Box<Self>) -> Box<dyn ValueDecoder>;
+ fn into_dictionary_decoder(self: Box<Self>) -> Box<dyn DictionaryValueDecoder>;
+}
+
+impl<T> PlainValueDecoder for T
+where
+ T: ValueDecoder + DictionaryValueDecoder + 'static,
+{
+ fn into_value_decoder(self: Box<T>) -> Box<dyn ValueDecoder> {
+ self
+ }
+
+ fn into_dictionary_decoder(self: Box<T>) -> Box<dyn DictionaryValueDecoder> {
+ self
+ }
+}
+
+impl dyn ValueDecoder {
+ fn empty() -> impl ValueDecoder {
+ SingleValueDecoder::new(Ok(0))
+ }
+
+ fn once(value: Result<usize>) -> impl ValueDecoder {
+ SingleValueDecoder::new(value)
+ }
+}
+
+impl ValueDecoder for Box<dyn ValueDecoder> {
+ #[inline]
+ fn read_value_bytes(
+ &mut self,
+ num_values: usize,
+ read_bytes: &mut dyn FnMut(&[u8], usize),
+ ) -> Result<usize> {
+ self.as_mut().read_value_bytes(num_values, read_bytes)
+ }
+}
+
+struct SingleValueDecoder {
+ value: Result<usize>,
+}
+
+impl SingleValueDecoder {
+ fn new(value: Result<usize>) -> Self {
+ Self { value }
+ }
+}
+
+impl ValueDecoder for SingleValueDecoder {
+ fn read_value_bytes(
+ &mut self,
+ _num_values: usize,
+ _read_bytes: &mut dyn FnMut(&[u8], usize),
+ ) -> Result<usize> {
+ self.value.clone()
+ }
+}
+
+struct CompositeValueDecoder<I: Iterator<Item = Box<dyn ValueDecoder>>> {
+ current_decoder: Option<Box<dyn ValueDecoder>>,
+ decoder_iter: I,
+}
+
+impl<I: Iterator<Item = Box<dyn ValueDecoder>>> CompositeValueDecoder<I> {
+ fn new(mut decoder_iter: I) -> Self {
+ let current_decoder = decoder_iter.next();
+ Self {
+ current_decoder,
+ decoder_iter,
+ }
+ }
+}
+
+impl<I: Iterator<Item = Box<dyn ValueDecoder>>> ValueDecoder
+ for CompositeValueDecoder<I>
+{
+ fn read_value_bytes(
+ &mut self,
+ num_values: usize,
+ read_bytes: &mut dyn FnMut(&[u8], usize),
+ ) -> Result<usize> {
+ let mut values_to_read = num_values;
+ while values_to_read > 0 {
+ let value_decoder = match self.current_decoder.as_mut() {
+ Some(d) => d,
+ // no more decoders
+ None => break,
+ };
+ while values_to_read > 0 {
+ let values_read =
+ value_decoder.read_value_bytes(values_to_read, read_bytes)?;
+ if values_read > 0 {
+ values_to_read -= values_read;
+ } else {
+ // no more values in current decoder
+ self.current_decoder = self.decoder_iter.next();
+ break;
+ }
+ }
+ }
+
+ Ok(num_values - values_to_read)
+ }
+}
+
+struct LevelValueDecoder {
+ level_decoder: crate::encodings::levels::LevelDecoder,
+ level_value_buffer: Vec<i16>,
+}
+
+impl LevelValueDecoder {
+ fn new(level_decoder: crate::encodings::levels::LevelDecoder) -> Self {
+ Self {
+ level_decoder,
+ level_value_buffer: vec![0i16; 2048],
+ }
+ }
+}
+
+impl ValueDecoder for LevelValueDecoder {
+ fn read_value_bytes(
+ &mut self,
+ num_values: usize,
+ read_bytes: &mut dyn FnMut(&[u8], usize),
+ ) -> Result<usize> {
+ let value_size = std::mem::size_of::<i16>();
+ let mut total_values_read = 0;
+ while total_values_read < num_values {
+ let values_to_read = std::cmp::min(
+ num_values - total_values_read,
+ self.level_value_buffer.len(),
+ );
+ let values_read = match self
+ .level_decoder
+ .get(&mut self.level_value_buffer[..values_to_read])
+ {
+ Ok(values_read) => values_read,
+ Err(e) => return Err(e),
+ };
+ if values_read > 0 {
+ let level_value_bytes =
+ &self.level_value_buffer.to_byte_slice()[..values_read * value_size];
+ read_bytes(level_value_bytes, values_read);
+ total_values_read += values_read;
+ } else {
+ break;
+ }
+ }
+ Ok(total_values_read)
+ }
+}
+
+pub(crate) struct FixedLenPlainDecoder {
+ data: ByteBufferPtr,
+ num_values: usize,
+ value_bit_len: usize,
+}
+
+impl FixedLenPlainDecoder {
+ pub(crate) fn new(
+ data: ByteBufferPtr,
+ num_values: usize,
+ value_bit_len: usize,
+ ) -> Self {
+ Self {
+ data,
+ num_values,
+ value_bit_len,
+ }
+ }
+}
+
+impl DictionaryValueDecoder for FixedLenPlainDecoder {
+ fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>> {
+ let value_byte_len = self.value_bit_len / 8;
+ let available_values = self.data.len() / value_byte_len;
+ let values_to_read = std::cmp::min(available_values, self.num_values);
+ let byte_len = values_to_read * value_byte_len;
+ let values = vec![self.data.range(0, byte_len)];
+ self.num_values = 0;
+ self.data.set_range(self.data.start(), 0);
+ Ok(values)
+ }
+}
+
+impl ValueDecoder for FixedLenPlainDecoder {
+ fn read_value_bytes(
+ &mut self,
+ num_values: usize,
+ read_bytes: &mut dyn FnMut(&[u8], usize),
+ ) -> Result<usize> {
+ let available_values = self.data.len() * 8 / self.value_bit_len;
+ if available_values > 0 {
+ let values_to_read = std::cmp::min(available_values, num_values);
+ let byte_len = values_to_read * self.value_bit_len / 8;
+ read_bytes(&self.data.data()[..byte_len], values_to_read);
+ self.data
+ .set_range(self.data.start() + byte_len, self.data.len() - byte_len);
+ Ok(values_to_read)
+ } else {
+ Ok(0)
+ }
+ }
+}
+
+pub(crate) struct VariableLenPlainDecoder {
+ data: ByteBufferPtr,
+ num_values: usize,
+ position: usize,
+}
+
+impl VariableLenPlainDecoder {
+ pub(crate) fn new(data: ByteBufferPtr, num_values: usize) -> Self {
+ Self {
+ data,
+ num_values,
+ position: 0,
+ }
+ }
+}
+
+impl DictionaryValueDecoder for VariableLenPlainDecoder {
+ fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>> {
+ const LEN_SIZE: usize = std::mem::size_of::<u32>();
+ let data = self.data.data();
+ let data_len = data.len();
+ let values_to_read = self.num_values;
+ let mut values = Vec::with_capacity(values_to_read);
+ let mut values_read = 0;
+ while self.position < data_len && values_read < values_to_read {
+ let len: usize =
+ read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize;
+ self.position += LEN_SIZE;
+ if data_len < self.position + len {
+ return Err(eof_err!("Not enough bytes to decode"));
+ }
+ values.push(self.data.range(self.position, len));
+ self.position += len;
+ values_read += 1;
+ }
+ self.num_values -= values_read;
+ Ok(values)
+ }
+}
+
+impl ValueDecoder for VariableLenPlainDecoder {
+ fn read_value_bytes(
+ &mut self,
+ num_values: usize,
+ read_bytes: &mut dyn FnMut(&[u8], usize),
+ ) -> Result<usize> {
+ const LEN_SIZE: usize = std::mem::size_of::<u32>();
+ let data = self.data.data();
+ let data_len = data.len();
+ let values_to_read = std::cmp::min(self.num_values, num_values);
+ let mut values_read = 0;
+ while self.position < data_len && values_read < values_to_read {
+ let len: usize =
+ read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize;
+ self.position += LEN_SIZE;
+ if data_len < self.position + len {
+ return Err(eof_err!("Not enough bytes to decode"));
+ }
+ read_bytes(&data[self.position..][..len], 1);
+ self.position += len;
+ values_read += 1;
+ }
+ self.num_values -= values_read;
+ Ok(values_read)
+ }
+}
+
+pub(crate) struct FixedLenDictionaryDecoder {
+ context_ref: Rc<RefCell<ColumnChunkContext>>,
+ key_data_bufer: ByteBufferPtr,
+ num_values: usize,
+ rle_decoder: RleDecoder,
+ value_byte_len: usize,
+ keys_buffer: Vec<i32>,
+}
+
+impl FixedLenDictionaryDecoder {
+ pub(crate) fn new(
+ column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
+ key_data_bufer: ByteBufferPtr,
+ num_values: usize,
+ value_bit_len: usize,
+ ) -> Self {
+ assert!(
+ value_bit_len % 8 == 0,
+ "value_bit_size must be a multiple of 8"
+ );
+ // First byte in `data` is bit width
+ let bit_width = key_data_bufer.data()[0];
+ let mut rle_decoder = RleDecoder::new(bit_width);
+ rle_decoder.set_data(key_data_bufer.start_from(1));
+
+ Self {
+ context_ref: column_chunk_context,
+ key_data_bufer,
+ num_values,
+ rle_decoder,
+ value_byte_len: value_bit_len / 8,
+ keys_buffer: vec![0; 2048],
+ }
+ }
+}
+
+impl ValueDecoder for FixedLenDictionaryDecoder {
+ fn read_value_bytes(
+ &mut self,
+ num_values: usize,
+ read_bytes: &mut dyn FnMut(&[u8], usize),
+ ) -> Result<usize> {
+ if self.num_values == 0 {
+ return Ok(0);
+ }
+ let context = self.context_ref.borrow();
+ let values = context.dictionary_values.as_ref().unwrap();
+ let input_value_bytes = values[0].data();
+ // read no more than available values or requested values
+ let values_to_read = std::cmp::min(self.num_values, num_values);
+ let mut values_read = 0;
+ while values_read < values_to_read {
+ // read values in batches of up to self.keys_buffer.len()
+ let keys_to_read =
+ std::cmp::min(values_to_read - values_read, self.keys_buffer.len());
+ let keys_read = match self
+ .rle_decoder
+ .get_batch(&mut self.keys_buffer[..keys_to_read])
+ {
+ Ok(keys_read) => keys_read,
+ Err(e) => return Err(e),
+ };
+ if keys_read == 0 {
+ self.num_values = 0;
+ return Ok(values_read);
+ }
+ for i in 0..keys_read {
+ let key = self.keys_buffer[i] as usize;
+ read_bytes(
+ &input_value_bytes[key * self.value_byte_len..]
+ [..self.value_byte_len],
+ 1,
+ );
+ }
+ values_read += keys_read;
+ }
+ self.num_values -= values_read;
+ Ok(values_read)
+ }
+}
+
+pub(crate) struct VariableLenDictionaryDecoder {
+ context_ref: Rc<RefCell<ColumnChunkContext>>,
+ key_data_bufer: ByteBufferPtr,
+ num_values: usize,
+ rle_decoder: RleDecoder,
+ keys_buffer: Vec<i32>,
+}
+
+impl VariableLenDictionaryDecoder {
+ pub(crate) fn new(
+ column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
+ key_data_bufer: ByteBufferPtr,
+ num_values: usize,
+ ) -> Self {
+ // First byte in `data` is bit width
+ let bit_width = key_data_bufer.data()[0];
+ let mut rle_decoder = RleDecoder::new(bit_width);
+ rle_decoder.set_data(key_data_bufer.start_from(1));
+
+ Self {
+ context_ref: column_chunk_context,
+ key_data_bufer,
+ num_values,
+ rle_decoder,
+ keys_buffer: vec![0; 2048],
+ }
+ }
+}
+
+impl ValueDecoder for VariableLenDictionaryDecoder {
+ fn read_value_bytes(
+ &mut self,
+ num_values: usize,
+ read_bytes: &mut dyn FnMut(&[u8], usize),
+ ) -> Result<usize> {
+ if self.num_values == 0 {
+ return Ok(0);
+ }
+ let context = self.context_ref.borrow();
+ let values = context.dictionary_values.as_ref().unwrap();
+ let values_to_read = std::cmp::min(self.num_values, num_values);
+ let mut values_read = 0;
+ while values_read < values_to_read {
+ // read values in batches of up to self.keys_buffer.len()
+ let keys_to_read =
+ std::cmp::min(values_to_read - values_read, self.keys_buffer.len());
+ let keys_read = match self
+ .rle_decoder
+ .get_batch(&mut self.keys_buffer[..keys_to_read])
+ {
+ Ok(keys_read) => keys_read,
+ Err(e) => return Err(e),
+ };
+ if keys_read == 0 {
+ self.num_values = 0;
+ return Ok(values_read);
+ }
+ for i in 0..keys_read {
+ let key = self.keys_buffer[i] as usize;
+ read_bytes(values[key].data(), 1);
+ }
+ values_read += keys_read;
+ }
+ self.num_values -= values_read;
+ Ok(values_read)
+ }
+}
+
+use arrow::datatypes::ArrowPrimitiveType;
+
+pub struct PrimitiveArrayConverter<T: ArrowPrimitiveType> {
+ _phantom_data: PhantomData<T>,
+}
+
+impl<T: ArrowPrimitiveType> PrimitiveArrayConverter<T> {
+ pub fn new() -> Self {
+ Self {
+ _phantom_data: PhantomData,
+ }
+ }
+}
+
+impl<T: ArrowPrimitiveType> ArrayConverter for PrimitiveArrayConverter<T> {
+ fn convert_value_bytes(
+ &self,
+ value_decoder: &mut impl ValueDecoder,
+ num_values: usize,
+ ) -> Result<arrow::array::ArrayData> {
+ let value_size = T::get_byte_width();
+ let values_byte_capacity = num_values * value_size;
+ let mut values_buffer = MutableBuffer::new(values_byte_capacity);
+
+ value_decoder.read_value_bytes(num_values, &mut |value_bytes, _| {
+ values_buffer.extend_from_slice(value_bytes);
+ })?;
+
+ // calculate actual data_len, which may be different from the iterator's upper bound
+ let value_count = values_buffer.len() / value_size;
+ let array_data = arrow::array::ArrayData::builder(T::DATA_TYPE)
+ .len(value_count)
+ .add_buffer(values_buffer.into())
+ .build();
+ Ok(array_data)
+ }
+}
+
+pub struct StringArrayConverter {}
+
+impl StringArrayConverter {
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl ArrayConverter for StringArrayConverter {
+ fn convert_value_bytes(
+ &self,
+ value_decoder: &mut impl ValueDecoder,
+ num_values: usize,
+ ) -> Result<arrow::array::ArrayData> {
+ use arrow::datatypes::ArrowNativeType;
+ let offset_size = std::mem::size_of::<i32>();
+ let mut offsets_buffer = MutableBuffer::new((num_values + 1) * offset_size);
+ // allocate initial capacity of 1 byte for each item
+ let values_byte_capacity = num_values;
+ let mut values_buffer = MutableBuffer::new(values_byte_capacity);
+
+ let mut length_so_far = i32::default();
+ offsets_buffer.push(length_so_far);
+
+ value_decoder.read_value_bytes(num_values, &mut |value_bytes, values_read| {
+ debug_assert_eq!(
+ values_read, 1,
+ "offset length value buffers can only contain bytes for a single value"
+ );
+ length_so_far +=
+ <i32 as ArrowNativeType>::from_usize(value_bytes.len()).unwrap();
+ // this should be safe because a ValueDecoder should not read more than num_values
+ unsafe {
+ offsets_buffer.push_unchecked(length_so_far);
+ }
+ values_buffer.extend_from_slice(value_bytes);
+ })?;
+ // calculate actual data_len, which may be different from the iterator's upper bound
+ let data_len = (offsets_buffer.len() / offset_size) - 1;
+ let array_data = arrow::array::ArrayData::builder(ArrowType::Utf8)
+ .len(data_len)
+ .add_buffer(offsets_buffer.into())
+ .add_buffer(values_buffer.into())
+ .build();
+ Ok(array_data)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::column::page::Page;
+ use crate::data_type::ByteArray;
+ use crate::data_type::ByteArrayType;
+ use crate::schema::parser::parse_message_type;
+ use crate::schema::types::SchemaDescriptor;
+ use crate::util::test_common::page_util::{
+ DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
+ };
+ use crate::{
+ basic::Encoding, column::page::PageReader, schema::types::SchemaDescPtr,
+ };
+ use arrow::array::{PrimitiveArray, StringArray};
+ use arrow::datatypes::Int32Type as ArrowInt32;
+ use rand::{distributions::uniform::SampleUniform, thread_rng, Rng};
+ use std::sync::Arc;
+
+ /// Iterator for testing reading empty columns
+ struct EmptyPageIterator {
+ schema: SchemaDescPtr,
+ }
+
+ impl EmptyPageIterator {
+ fn new(schema: SchemaDescPtr) -> Self {
+ EmptyPageIterator { schema }
+ }
+ }
+
+ impl Iterator for EmptyPageIterator {
+ type Item = Result<Box<dyn PageReader>>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ None
+ }
+ }
+
+ impl PageIterator for EmptyPageIterator {
+ fn schema(&mut self) -> Result<SchemaDescPtr> {
+ Ok(self.schema.clone())
+ }
+
+ fn column_schema(&mut self) -> Result<ColumnDescPtr> {
+ Ok(self.schema.column(0))
+ }
+ }
+
+ #[test]
+ fn test_array_reader_empty_pages() {
+ // Construct column schema
+ let message_type = "
+ message test_schema {
+ REQUIRED INT32 leaf;
+ }
+ ";
+
+ let schema = parse_message_type(message_type)
+ .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+ .unwrap();
+
+ let column_desc = schema.column(0);
+ let page_iterator = EmptyPageIterator::new(schema);
+
+ let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+ let mut array_reader =
+ ArrowArrayReader::try_new(page_iterator, column_desc, converter, None)
+ .unwrap();
+
+ // expect no values to be read
+ let array = array_reader.next_batch(50).unwrap();
+ assert!(array.is_empty());
+ }
+
+ fn make_column_chunks<T: crate::data_type::DataType>(
+ column_desc: ColumnDescPtr,
+ encoding: Encoding,
+ num_levels: usize,
+ min_value: T::T,
+ max_value: T::T,
+ def_levels: &mut Vec<i16>,
+ rep_levels: &mut Vec<i16>,
+ values: &mut Vec<T::T>,
+ page_lists: &mut Vec<Vec<Page>>,
+ use_v2: bool,
+ num_chunks: usize,
+ ) where
+ T::T: PartialOrd + SampleUniform + Copy,
+ {
+ for _i in 0..num_chunks {
+ let mut pages = VecDeque::new();
+ let mut data = Vec::new();
+ let mut page_def_levels = Vec::new();
+ let mut page_rep_levels = Vec::new();
+
+ crate::util::test_common::make_pages::<T>(
+ column_desc.clone(),
+ encoding,
+ 1,
+ num_levels,
+ min_value,
+ max_value,
+ &mut page_def_levels,
+ &mut page_rep_levels,
+ &mut data,
+ &mut pages,
+ use_v2,
+ );
+
+ def_levels.append(&mut page_def_levels);
+ rep_levels.append(&mut page_rep_levels);
+ values.append(&mut data);
+ page_lists.push(Vec::from(pages));
+ }
+ }
+
+ #[test]
+ fn test_primitive_array_reader_data() {
+ // Construct column schema
+ let message_type = "
+ message test_schema {
+ REQUIRED INT32 leaf;
+ }
+ ";
+
+ let schema = parse_message_type(message_type)
+ .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+ .unwrap();
+
+ let column_desc = schema.column(0);
+
+ // Construct page iterator
+ {
+ let mut data = Vec::new();
+ let mut page_lists = Vec::new();
+ make_column_chunks::<crate::data_type::Int32Type>(
+ column_desc.clone(),
+ Encoding::PLAIN,
+ 100,
+ 1,
+ 200,
+ &mut Vec::new(),
+ &mut Vec::new(),
+ &mut data,
+ &mut page_lists,
+ true,
+ 2,
+ );
+ let page_iterator =
+ InMemoryPageIterator::new(schema, column_desc.clone(), page_lists);
+
+ let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+ let mut array_reader =
+ ArrowArrayReader::try_new(page_iterator, column_desc, converter, None)
+ .unwrap();
+
+ // Read first 50 values, which are all from the first column chunk
+ let array = array_reader.next_batch(50).unwrap();
+ let array = array
+ .as_any()
+ .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+ .unwrap();
+
+ assert_eq!(
+ &PrimitiveArray::<ArrowInt32>::from(data[0..50].to_vec()),
+ array
+ );
+
+ // Read next 100 values, the first 50 ones are from the first column chunk,
+ // and the last 50 ones are from the second column chunk
+ let array = array_reader.next_batch(100).unwrap();
+ let array = array
+ .as_any()
+ .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+ .unwrap();
+
+ assert_eq!(
+ &PrimitiveArray::<ArrowInt32>::from(data[50..150].to_vec()),
+ array
+ );
+
+ // Try to read 100 values, however there are only 50 values
+ let array = array_reader.next_batch(100).unwrap();
+ let array = array
+ .as_any()
+ .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+ .unwrap();
+
+ assert_eq!(
+ &PrimitiveArray::<ArrowInt32>::from(data[150..200].to_vec()),
+ array
+ );
+ }
+ }
+
+ #[test]
+ fn test_primitive_array_reader_def_and_rep_levels() {
+ // Construct column schema
+ let message_type = "
+ message test_schema {
+ REPEATED Group test_mid {
+ OPTIONAL INT32 leaf;
+ }
+ }
+ ";
+
+ let schema = parse_message_type(message_type)
+ .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+ .unwrap();
+
+ let column_desc = schema.column(0);
+
+ // Construct page iterator
+ {
+ let mut def_levels = Vec::new();
+ let mut rep_levels = Vec::new();
+ let mut page_lists = Vec::new();
+ make_column_chunks::<crate::data_type::Int32Type>(
+ column_desc.clone(),
+ Encoding::PLAIN,
+ 100,
+ 1,
+ 200,
+ &mut def_levels,
+ &mut rep_levels,
+ &mut Vec::new(),
+ &mut page_lists,
+ true,
+ 2,
+ );
+
+ let page_iterator =
+ InMemoryPageIterator::new(schema, column_desc.clone(), page_lists);
+
+ let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+ let mut array_reader =
+ ArrowArrayReader::try_new(page_iterator, column_desc, converter, None)
+ .unwrap();
+
+ let mut accu_len: usize = 0;
+
+ // Read first 50 values, which are all from the first column chunk
+ let array = array_reader.next_batch(50).unwrap();
+ assert_eq!(
+ Some(&def_levels[accu_len..(accu_len + array.len())]),
+ array_reader.get_def_levels()
+ );
+ assert_eq!(
+ Some(&rep_levels[accu_len..(accu_len + array.len())]),
+ array_reader.get_rep_levels()
+ );
+ accu_len += array.len();
+
+ // Read next 100 values, the first 50 ones are from the first column chunk,
+ // and the last 50 ones are from the second column chunk
+ let array = array_reader.next_batch(100).unwrap();
+ assert_eq!(
+ Some(&def_levels[accu_len..(accu_len + array.len())]),
+ array_reader.get_def_levels()
+ );
+ assert_eq!(
+ Some(&rep_levels[accu_len..(accu_len + array.len())]),
+ array_reader.get_rep_levels()
+ );
+ accu_len += array.len();
+
+ // Try to read 100 values, however there are only 50 values
+ let array = array_reader.next_batch(100).unwrap();
+ assert_eq!(
+ Some(&def_levels[accu_len..(accu_len + array.len())]),
+ array_reader.get_def_levels()
+ );
+ assert_eq!(
+ Some(&rep_levels[accu_len..(accu_len + array.len())]),
+ array_reader.get_rep_levels()
+ );
+
+ assert_eq!(accu_len + array.len(), 200);
+ }
+ }
+
+ #[test]
+ fn test_arrow_array_reader_string() {
+ // Construct column schema
+ let message_type = "
+ message test_schema {
+ REPEATED Group test_mid {
+ OPTIONAL BYTE_ARRAY leaf (UTF8);
+ }
+ }
+ ";
+ let num_pages = 2;
+ let values_per_page = 100;
+ let str_base = "Hello World";
+
+ let schema = parse_message_type(message_type)
+ .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+ .unwrap();
+ let column_desc = schema.column(0);
+ let max_def_level = column_desc.max_def_level();
+ let max_rep_level = column_desc.max_rep_level();
+
+ assert_eq!(max_def_level, 2);
+ assert_eq!(max_rep_level, 1);
+
+ let mut rng = thread_rng();
+ let mut pages: Vec<Vec<Page>> = Vec::new();
+
+ let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
+ let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
+ let mut all_values = Vec::with_capacity(num_pages * values_per_page);
+
+ for i in 0..num_pages {
+ let mut values = Vec::with_capacity(values_per_page);
+
+ for _ in 0..values_per_page {
+ let def_level = rng.gen_range(0..max_def_level + 1);
+ let rep_level = rng.gen_range(0..max_rep_level + 1);
+ if def_level == max_def_level {
+ let len = rng.gen_range(1..str_base.len());
+ let slice = &str_base[..len];
+ values.push(ByteArray::from(slice));
+ all_values.push(Some(slice.to_string()));
+ } else {
+ all_values.push(None)
+ }
+ rep_levels.push(rep_level);
+ def_levels.push(def_level)
+ }
+
+ let range = i * values_per_page..(i + 1) * values_per_page;
+ let mut pb =
+ DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+
+ pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]);
+ pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
+ pb.add_values::<ByteArrayType>(Encoding::PLAIN, values.as_slice());
+
+ let data_page = pb.consume();
+ pages.push(vec![data_page]);
+ }
+
+ let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages);
+ let converter = StringArrayConverter::new();
+ let mut array_reader =
+ ArrowArrayReader::try_new(page_iterator, column_desc, converter, None)
+ .unwrap();
+
+ let mut accu_len: usize = 0;
+
+ let array = array_reader.next_batch(values_per_page / 2).unwrap();
+ assert_eq!(array.len(), values_per_page / 2);
+ assert_eq!(
+ Some(&def_levels[accu_len..(accu_len + array.len())]),
+ array_reader.get_def_levels()
+ );
+ assert_eq!(
+ Some(&rep_levels[accu_len..(accu_len + array.len())]),
+ array_reader.get_rep_levels()
+ );
+ accu_len += array.len();
+
+ // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk,
+ // and the last values_per_page/2 ones are from the second column chunk
+ let array = array_reader.next_batch(values_per_page).unwrap();
+ assert_eq!(array.len(), values_per_page);
+ assert_eq!(
+ Some(&def_levels[accu_len..(accu_len + array.len())]),
+ array_reader.get_def_levels()
+ );
+ assert_eq!(
+ Some(&rep_levels[accu_len..(accu_len + array.len())]),
+ array_reader.get_rep_levels()
+ );
+ let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+ for i in 0..array.len() {
+ if array.is_valid(i) {
+ assert_eq!(
+ all_values[i + accu_len].as_ref().unwrap().as_str(),
+ strings.value(i)
+ )
+ } else {
+ assert_eq!(all_values[i + accu_len], None)
+ }
+ }
+ accu_len += array.len();
+
+ // Try to read values_per_page values, however there are only values_per_page/2 values
+ let array = array_reader.next_batch(values_per_page).unwrap();
+ assert_eq!(array.len(), values_per_page / 2);
+ assert_eq!(
+ Some(&def_levels[accu_len..(accu_len + array.len())]),
+ array_reader.get_def_levels()
+ );
+ assert_eq!(
+ Some(&rep_levels[accu_len..(accu_len + array.len())]),
+ array_reader.get_rep_levels()
+ );
+ }
+}
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index b1aa39e..afb1fdc 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -49,10 +49,11 @@
//!}
//! ```
-pub(in crate::arrow) mod array_reader;
+pub mod array_reader;
+pub mod arrow_array_reader;
pub mod arrow_reader;
pub mod arrow_writer;
-pub(in crate::arrow) mod converter;
+pub mod converter;
pub(in crate::arrow) mod levels;
pub(in crate::arrow) mod record_reader;
pub mod schema;
diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs
index 7e3b6a8..4dd7da9 100644
--- a/parquet/src/arrow/record_reader.rs
+++ b/parquet/src/arrow/record_reader.rs
@@ -464,6 +464,14 @@
}
}
+ impl Iterator for TestPageReader {
+ type Item = Result<Page>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.get_next_page().transpose()
+ }
+ }
+
#[test]
fn test_read_required_records() {
// Construct column schema
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index b351578..b75d3b5 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -28,6 +28,7 @@
/// List of supported pages.
/// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which
/// used to store uncompressed bytes of the page.
+#[derive(Clone)]
pub enum Page {
DataPage {
buf: ByteBufferPtr,
@@ -188,7 +189,7 @@
/// API for reading pages from a column chunk.
/// This offers a iterator like API to get the next page.
-pub trait PageReader {
+pub trait PageReader: Iterator<Item = Result<Page>> {
/// Gets the next page in the column chunk associated with this reader.
/// Returns `None` if there are no pages left.
fn get_next_page(&mut self) -> Result<Option<Page>>;
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 1181565..63be17b 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -1353,4 +1353,12 @@
Ok(self.pages.next())
}
}
+
+ impl Iterator for TestPageReader {
+ type Item = Result<Page>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.get_next_page().transpose()
+ }
+ }
}
diff --git a/parquet/src/encodings/mod.rs b/parquet/src/encodings/mod.rs
index 33b1e23..6046dda 100644
--- a/parquet/src/encodings/mod.rs
+++ b/parquet/src/encodings/mod.rs
@@ -18,4 +18,4 @@
pub mod decoding;
pub mod encoding;
pub mod levels;
-mod rle;
+pub(crate) mod rle;
diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs
index 021c1f0..be1a221 100644
--- a/parquet/src/errors.rs
+++ b/parquet/src/errors.rs
@@ -22,7 +22,7 @@
#[cfg(any(feature = "arrow", test))]
use arrow::error::ArrowError;
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Clone)]
pub enum ParquetError {
/// General Parquet error.
/// Returned when code violates normal workflow of working with Parquet files.
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index 0877e62..a4d79a3 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -271,6 +271,14 @@
}
}
+impl<T: Read> Iterator for SerializedPageReader<T> {
+ type Item = Result<Page>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.get_next_page().transpose()
+ }
+}
+
impl<T: Read> PageReader for SerializedPageReader<T> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
while self.seen_num_values < self.total_num_values {
diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs
index a931b95..900e2b5 100644
--- a/parquet/src/lib.rs
+++ b/parquet/src/lib.rs
@@ -46,7 +46,7 @@
pub use self::util::memory;
#[macro_use]
-mod util;
+pub mod util;
#[cfg(any(feature = "arrow", test))]
pub mod arrow;
pub mod column;
diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs
index 03b2500..1aa8c26 100644
--- a/parquet/src/schema/types.rs
+++ b/parquet/src/schema/types.rs
@@ -757,11 +757,13 @@
}
/// Returns maximum definition level for this column.
+ #[inline]
pub fn max_def_level(&self) -> i16 {
self.max_def_level
}
/// Returns maximum repetition level for this column.
+ #[inline]
pub fn max_rep_level(&self) -> i16 {
self.max_rep_level
}
diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs
index 57d0c24..1642a4b 100644
--- a/parquet/src/util/memory.rs
+++ b/parquet/src/util/memory.rs
@@ -292,6 +292,7 @@
}
/// Returns slice of data in this buffer.
+ #[inline]
pub fn data(&self) -> &[T] {
&self.data[self.start..self.start + self.len]
}
@@ -299,12 +300,20 @@
/// Updates this buffer with new `start` position and length `len`.
///
/// Range should be within current start position and length.
+ #[inline]
pub fn with_range(mut self, start: usize, len: usize) -> Self {
- assert!(start <= self.len);
- assert!(start + len <= self.len);
+ self.set_range(start, len);
+ self
+ }
+
+ /// Updates this buffer with new `start` position and length `len`.
+ ///
+ /// Range should be within current start position and length.
+ #[inline]
+ pub fn set_range(&mut self, start: usize, len: usize) {
+ assert!(self.start <= start && start + len <= self.start + self.len);
self.start = start;
self.len = len;
- self
}
/// Adds memory tracker to this buffer.
@@ -314,16 +323,19 @@
}
/// Returns start position of this buffer.
+ #[inline]
pub fn start(&self) -> usize {
self.start
}
/// Returns length of this buffer
+ #[inline]
pub fn len(&self) -> usize {
self.len
}
/// Returns whether this buffer is empty
+ #[inline]
pub fn is_empty(&self) -> bool {
self.len == 0
}
@@ -393,6 +405,7 @@
}
impl AsRef<[u8]> for BufferPtr<u8> {
+ #[inline]
fn as_ref(&self) -> &[u8] {
&self.data[self.start..self.start + self.len]
}
diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs
index af9a1aa..8f6d85d 100644
--- a/parquet/src/util/mod.rs
+++ b/parquet/src/util/mod.rs
@@ -22,6 +22,7 @@
mod bit_packing;
pub mod cursor;
pub mod hash_util;
-
-#[cfg(test)]
-pub mod test_common;
+pub(crate) mod test_common;
+pub use self::test_common::page_util::{
+ DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
+};
diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs
index 2e0e8e9..581845a 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -32,7 +32,6 @@
use std::collections::VecDeque;
use std::mem;
use std::sync::Arc;
-use std::vec::IntoIter;
pub trait DataPageBuilder {
fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]);
@@ -78,6 +77,9 @@
// Adds levels to the buffer and return number of encoded bytes
fn add_levels(&mut self, max_level: i16, levels: &[i16]) -> u32 {
+ if max_level <= 0 {
+ return 0;
+ }
let size = max_buffer_size(Encoding::RLE, max_level, levels.len());
let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, vec![0; size]);
level_encoder.put(levels).expect("put() should be OK");
@@ -163,60 +165,65 @@
}
/// A utility page reader which stores pages in memory.
-pub struct InMemoryPageReader {
- pages: Box<dyn Iterator<Item = Page>>,
+pub struct InMemoryPageReader<P: Iterator<Item = Page>> {
+ page_iter: P,
}
-impl InMemoryPageReader {
- pub fn new(pages: Vec<Page>) -> Self {
+impl<P: Iterator<Item = Page>> InMemoryPageReader<P> {
+ pub fn new(pages: impl IntoIterator<Item = Page, IntoIter = P>) -> Self {
Self {
- pages: Box::new(pages.into_iter()),
+ page_iter: pages.into_iter(),
}
}
}
-impl PageReader for InMemoryPageReader {
+impl<P: Iterator<Item = Page>> PageReader for InMemoryPageReader<P> {
fn get_next_page(&mut self) -> Result<Option<Page>> {
- Ok(self.pages.next())
+ Ok(self.page_iter.next())
+ }
+}
+
+impl<P: Iterator<Item = Page>> Iterator for InMemoryPageReader<P> {
+ type Item = Result<Page>;
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.get_next_page().transpose()
}
}
/// A utility page iterator which stores page readers in memory, used for tests.
-pub struct InMemoryPageIterator {
+#[derive(Clone)]
+pub struct InMemoryPageIterator<I: Iterator<Item = Vec<Page>>> {
schema: SchemaDescPtr,
column_desc: ColumnDescPtr,
- page_readers: IntoIter<Box<dyn PageReader>>,
+ page_reader_iter: I,
}
-impl InMemoryPageIterator {
+impl<I: Iterator<Item = Vec<Page>>> InMemoryPageIterator<I> {
pub fn new(
schema: SchemaDescPtr,
column_desc: ColumnDescPtr,
- pages: Vec<Vec<Page>>,
+ pages: impl IntoIterator<Item = Vec<Page>, IntoIter = I>,
) -> Self {
- let page_readers = pages
- .into_iter()
- .map(|pages| Box::new(InMemoryPageReader::new(pages)) as Box<dyn PageReader>)
- .collect::<Vec<Box<dyn PageReader>>>()
- .into_iter();
-
Self {
schema,
column_desc,
- page_readers,
+ page_reader_iter: pages.into_iter(),
}
}
}
-impl Iterator for InMemoryPageIterator {
+impl<I: Iterator<Item = Vec<Page>>> Iterator for InMemoryPageIterator<I> {
type Item = Result<Box<dyn PageReader>>;
fn next(&mut self) -> Option<Self::Item> {
- self.page_readers.next().map(Ok)
+ self.page_reader_iter
+ .next()
+ .map(|x| Ok(Box::new(InMemoryPageReader::new(x)) as Box<dyn PageReader>))
}
}
-impl PageIterator for InMemoryPageIterator {
+impl<I: Iterator<Item = Vec<Page>>> PageIterator for InMemoryPageIterator<I> {
fn schema(&mut self) -> Result<SchemaDescPtr> {
Ok(self.schema.clone())
}