Implement faster arrow array reader (#384)

* implement ArrowArrayReader

* change StringArrayConverter to use push_unchecked for offsets

* add ASF license header to new files

* fix clippy issues

* cleanup arrow_array_reader benches

* cleanup arrow_array_reader

* change util module to limit public exports from test_common sub-module

* fix rustfmt issues
diff --git a/arrow/src/array/data.rs b/arrow/src/array/data.rs
index 172bdaa..64767e3 100644
--- a/arrow/src/array/data.rs
+++ b/arrow/src/array/data.rs
@@ -506,6 +506,11 @@
         self
     }
 
+    pub fn null_count(mut self, null_count: usize) -> Self {
+        self.null_count = Some(null_count);
+        self
+    }
+
     pub fn null_bit_buffer(mut self, buf: Buffer) -> Self {
         self.null_bit_buffer = Some(buf);
         self
diff --git a/arrow/src/array/transform/mod.rs b/arrow/src/array/transform/mod.rs
index 3fae220..0194b93 100644
--- a/arrow/src/array/transform/mod.rs
+++ b/arrow/src/array/transform/mod.rs
@@ -25,7 +25,7 @@
 
 use super::{
     data::{into_buffers, new_buffers},
-    ArrayData,
+    ArrayData, ArrayDataBuilder,
 };
 use crate::array::StringOffsetSizeTrait;
 
@@ -63,7 +63,7 @@
 }
 
 impl<'a> _MutableArrayData<'a> {
-    fn freeze(self, dictionary: Option<ArrayData>) -> ArrayData {
+    fn freeze(self, dictionary: Option<ArrayData>) -> ArrayDataBuilder {
         let buffers = into_buffers(&self.data_type, self.buffer1, self.buffer2);
 
         let child_data = match self.data_type {
@@ -76,19 +76,19 @@
                 child_data
             }
         };
-        ArrayData::new(
-            self.data_type,
-            self.len,
-            Some(self.null_count),
-            if self.null_count > 0 {
-                Some(self.null_buffer.into())
-            } else {
-                None
-            },
-            0,
-            buffers,
-            child_data,
-        )
+
+        let mut array_data_builder = ArrayDataBuilder::new(self.data_type)
+            .offset(0)
+            .len(self.len)
+            .null_count(self.null_count)
+            .buffers(buffers)
+            .child_data(child_data);
+        if self.null_count > 0 {
+            array_data_builder =
+                array_data_builder.null_bit_buffer(self.null_buffer.into());
+        }
+
+        array_data_builder
     }
 }
 
@@ -552,8 +552,13 @@
             .map(|array| build_extend_null_bits(array, use_nulls))
             .collect();
 
-        let null_bytes = bit_util::ceil(array_capacity, 8);
-        let null_buffer = MutableBuffer::from_len_zeroed(null_bytes);
+        let null_buffer = if use_nulls {
+            let null_bytes = bit_util::ceil(array_capacity, 8);
+            MutableBuffer::from_len_zeroed(null_bytes)
+        } else {
+            // create 0 capacity mutable buffer with the intention that it won't be used
+            MutableBuffer::with_capacity(0)
+        };
 
         let extend_values = match &data_type {
             DataType::Dictionary(_, _) => {
@@ -605,13 +610,40 @@
 
     /// Extends this [MutableArrayData] with null elements, disregarding the bound arrays
     pub fn extend_nulls(&mut self, len: usize) {
+        // TODO: null_buffer should probably be extended here as well
+        // otherwise is_valid() could later panic
+        // add test to confirm
         self.data.null_count += len;
         (self.extend_nulls)(&mut self.data, len);
         self.data.len += len;
     }
 
+    /// Returns the current length
+    #[inline]
+    pub fn len(&self) -> usize {
+        self.data.len
+    }
+
+    /// Returns true if len is 0
+    #[inline]
+    pub fn is_empty(&self) -> bool {
+        self.data.len == 0
+    }
+
+    /// Returns the current null count
+    #[inline]
+    pub fn null_count(&self) -> usize {
+        self.data.null_count
+    }
+
     /// Creates a [ArrayData] from the pushed regions up to this point, consuming `self`.
     pub fn freeze(self) -> ArrayData {
+        self.data.freeze(self.dictionary).build()
+    }
+
+    /// Creates a [ArrayDataBuilder] from the pushed regions up to this point, consuming `self`.
+    /// This is useful for extending the default behavior of MutableArrayData.
+    pub fn into_builder(self) -> ArrayDataBuilder {
         self.data.freeze(self.dictionary)
     }
 }
diff --git a/arrow/src/buffer/mutable.rs b/arrow/src/buffer/mutable.rs
index 35b123d..7d336e0 100644
--- a/arrow/src/buffer/mutable.rs
+++ b/arrow/src/buffer/mutable.rs
@@ -327,10 +327,10 @@
     }
 
     /// Extends the buffer with a new item, without checking for sufficient capacity
-    /// Safety
+    /// # Safety
     /// Caller must ensure that the capacity()-len()>=size_of<T>()
     #[inline]
-    unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
+    pub unsafe fn push_unchecked<T: ToByteSlice>(&mut self, item: T) {
         let additional = std::mem::size_of::<T>();
         let dst = self.data.as_ptr().add(self.len) as *mut T;
         std::ptr::write(dst, item);
diff --git a/arrow/src/compute/kernels/filter.rs b/arrow/src/compute/kernels/filter.rs
index b15692e..c5efadd 100644
--- a/arrow/src/compute/kernels/filter.rs
+++ b/arrow/src/compute/kernels/filter.rs
@@ -44,10 +44,10 @@
 /// slots of a [BooleanArray] are true. Each interval corresponds to a contiguous region of memory to be
 /// "taken" from an array to be filtered.
 #[derive(Debug)]
-pub(crate) struct SlicesIterator<'a> {
+pub struct SlicesIterator<'a> {
     iter: Enumerate<BitChunkIterator<'a>>,
     state: State,
-    filter_count: usize,
+    filter: &'a BooleanArray,
     remainder_mask: u64,
     remainder_len: usize,
     chunk_len: usize,
@@ -59,19 +59,14 @@
 }
 
 impl<'a> SlicesIterator<'a> {
-    pub(crate) fn new(filter: &'a BooleanArray) -> Self {
+    pub fn new(filter: &'a BooleanArray) -> Self {
         let values = &filter.data_ref().buffers()[0];
-
-        // this operation is performed before iteration
-        // because it is fast and allows reserving all the needed memory
-        let filter_count = values.count_set_bits_offset(filter.offset(), filter.len());
-
         let chunks = values.bit_chunks(filter.offset(), filter.len());
 
         Self {
             iter: chunks.iter().enumerate(),
             state: State::Chunks,
-            filter_count,
+            filter,
             remainder_len: chunks.remainder_len(),
             chunk_len: chunks.chunk_len(),
             remainder_mask: chunks.remainder_bits(),
@@ -83,6 +78,12 @@
         }
     }
 
+    /// Counts the number of set bits in the filter array.
+    fn filter_count(&self) -> usize {
+        let values = self.filter.values();
+        values.count_set_bits_offset(self.filter.offset(), self.filter.len())
+    }
+
     #[inline]
     fn current_start(&self) -> usize {
         self.current_chunk * 64 + self.current_bit
@@ -193,7 +194,7 @@
 /// Therefore, it is considered undefined behavior to pass `filter` with null values.
 pub fn build_filter(filter: &BooleanArray) -> Result<Filter> {
     let iter = SlicesIterator::new(filter);
-    let filter_count = iter.filter_count;
+    let filter_count = iter.filter_count();
     let chunks = iter.collect::<Vec<_>>();
 
     Ok(Box::new(move |array: &ArrayData| {
@@ -253,7 +254,8 @@
     }
 
     let iter = SlicesIterator::new(predicate);
-    match iter.filter_count {
+    let filter_count = iter.filter_count();
+    match filter_count {
         0 => {
             // return empty
             Ok(new_empty_array(array.data_type()))
@@ -266,7 +268,7 @@
         _ => {
             // actually filter
             let mut mutable =
-                MutableArrayData::new(vec![array.data_ref()], false, iter.filter_count);
+                MutableArrayData::new(vec![array.data_ref()], false, filter_count);
             iter.for_each(|(start, end)| mutable.extend(0, start, end));
             let data = mutable.freeze();
             Ok(make_array(data))
@@ -599,7 +601,7 @@
         let filter = BooleanArray::from(filter_values);
 
         let iter = SlicesIterator::new(&filter);
-        let filter_count = iter.filter_count;
+        let filter_count = iter.filter_count();
         let chunks = iter.collect::<Vec<_>>();
 
         assert_eq!(chunks, vec![(1, 2)]);
@@ -612,7 +614,7 @@
         let filter = BooleanArray::from(filter_values);
 
         let iter = SlicesIterator::new(&filter);
-        let filter_count = iter.filter_count;
+        let filter_count = iter.filter_count();
         let chunks = iter.collect::<Vec<_>>();
 
         assert_eq!(chunks, vec![(0, 1), (2, 64)]);
@@ -625,7 +627,7 @@
         let filter = BooleanArray::from(filter_values);
 
         let iter = SlicesIterator::new(&filter);
-        let filter_count = iter.filter_count;
+        let filter_count = iter.filter_count();
         let chunks = iter.collect::<Vec<_>>();
 
         assert_eq!(chunks, vec![(1, 62), (63, 124), (125, 130)]);
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index 1e54047..d66ee23 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -45,6 +45,7 @@
 base64 = { version = "0.13", optional = true }
 clap = { version = "2.33.3", optional = true }
 serde_json = { version = "1.0", features = ["preserve_order"], optional = true }
+rand = "0.8"
 
 [dev-dependencies]
 criterion = "0.3"
@@ -76,3 +77,7 @@
 [[bench]]
 name = "arrow_writer"
 harness = false
+
+[[bench]]
+name = "arrow_array_reader"
+harness = false
\ No newline at end of file
diff --git a/parquet/benches/arrow_array_reader.rs b/parquet/benches/arrow_array_reader.rs
new file mode 100644
index 0000000..6e87512
--- /dev/null
+++ b/parquet/benches/arrow_array_reader.rs
@@ -0,0 +1,766 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use criterion::{criterion_group, criterion_main, Criterion};
+use parquet::util::{DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator};
+use parquet::{
+    arrow::array_reader::ArrayReader,
+    basic::Encoding,
+    column::page::PageIterator,
+    data_type::{ByteArrayType, Int32Type},
+    schema::types::{ColumnDescPtr, SchemaDescPtr},
+};
+use std::{collections::VecDeque, sync::Arc};
+
+fn build_test_schema() -> SchemaDescPtr {
+    use parquet::schema::{parser::parse_message_type, types::SchemaDescriptor};
+    let message_type = "
+        message test_schema {
+            REQUIRED INT32 mandatory_int32_leaf;
+            REPEATED Group test_mid_int32 {
+                OPTIONAL INT32 optional_int32_leaf;
+            }
+            REQUIRED BYTE_ARRAY mandatory_string_leaf (UTF8);
+            REPEATED Group test_mid_string {
+                OPTIONAL BYTE_ARRAY optional_string_leaf (UTF8);
+            }
+        }
+        ";
+    parse_message_type(message_type)
+        .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+        .unwrap()
+}
+
+// test data params
+const NUM_ROW_GROUPS: usize = 1;
+const PAGES_PER_GROUP: usize = 2;
+const VALUES_PER_PAGE: usize = 10_000;
+const BATCH_SIZE: usize = 8192;
+
+use rand::{rngs::StdRng, Rng, SeedableRng};
+
+pub fn seedable_rng() -> StdRng {
+    StdRng::seed_from_u64(42)
+}
+
+fn build_plain_encoded_int32_page_iterator(
+    schema: SchemaDescPtr,
+    column_desc: ColumnDescPtr,
+    null_density: f32,
+) -> impl PageIterator + Clone {
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![0; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    let mut int32_value = 0;
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    int32_value += 1;
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder =
+                DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_int32_page_iterator(
+    schema: SchemaDescPtr,
+    column_desc: ColumnDescPtr,
+    null_density: f32,
+) -> impl PageIterator + Clone {
+    use parquet::encoding::{DictEncoder, Encoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![0; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = (0..NUM_UNIQUE_VALUES)
+        .map(|x| (x + 1) as i32)
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder =
+            DictEncoder::<Int32Type>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let int32_value = unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)];
+                    values.push(int32_value);
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder =
+                DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_plain_encoded_string_page_iterator(
+    schema: SchemaDescPtr,
+    column_desc: ColumnDescPtr,
+    null_density: f32,
+) -> impl PageIterator + Clone {
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![0; VALUES_PER_PAGE];
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = Vec::new();
+        for j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    let string_value =
+                        format!("Test value {}, row group: {}, page: {}", k, i, j);
+                    values
+                        .push(parquet::data_type::ByteArray::from(string_value.as_str()));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder =
+                DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            page_builder.add_values::<ByteArrayType>(Encoding::PLAIN, &values);
+            column_chunk_pages.push(page_builder.consume());
+        }
+        pages.push(column_chunk_pages);
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn build_dictionary_encoded_string_page_iterator(
+    schema: SchemaDescPtr,
+    column_desc: ColumnDescPtr,
+    null_density: f32,
+) -> impl PageIterator + Clone {
+    use parquet::encoding::{DictEncoder, Encoder};
+    let max_def_level = column_desc.max_def_level();
+    let max_rep_level = column_desc.max_rep_level();
+    let rep_levels = vec![0; VALUES_PER_PAGE];
+    // generate 1% unique values
+    const NUM_UNIQUE_VALUES: usize = VALUES_PER_PAGE / 100;
+    let unique_values = (0..NUM_UNIQUE_VALUES)
+        .map(|x| format!("Dictionary value {}", x))
+        .collect::<Vec<_>>();
+    let mut rng = seedable_rng();
+    let mut pages: Vec<Vec<parquet::column::page::Page>> = Vec::new();
+    for _i in 0..NUM_ROW_GROUPS {
+        let mut column_chunk_pages = VecDeque::new();
+        let mem_tracker = Arc::new(parquet::memory::MemTracker::new());
+        let mut dict_encoder =
+            DictEncoder::<ByteArrayType>::new(column_desc.clone(), mem_tracker);
+        // add data pages
+        for _j in 0..PAGES_PER_GROUP {
+            // generate page
+            let mut values = Vec::with_capacity(VALUES_PER_PAGE);
+            let mut def_levels = Vec::with_capacity(VALUES_PER_PAGE);
+            for _k in 0..VALUES_PER_PAGE {
+                let def_level = if rng.gen::<f32>() < null_density {
+                    max_def_level - 1
+                } else {
+                    max_def_level
+                };
+                if def_level == max_def_level {
+                    // select random value from list of unique values
+                    let string_value =
+                        unique_values[rng.gen_range(0..NUM_UNIQUE_VALUES)].as_str();
+                    values.push(parquet::data_type::ByteArray::from(string_value));
+                }
+                def_levels.push(def_level);
+            }
+            let mut page_builder =
+                DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+            page_builder.add_rep_levels(max_rep_level, &rep_levels);
+            page_builder.add_def_levels(max_def_level, &def_levels);
+            let _ = dict_encoder.put(&values);
+            let indices = dict_encoder
+                .write_indices()
+                .expect("write_indices() should be OK");
+            page_builder.add_indices(indices);
+            column_chunk_pages.push_back(page_builder.consume());
+        }
+        // add dictionary page
+        let dict = dict_encoder
+            .write_dict()
+            .expect("write_dict() should be OK");
+        let dict_page = parquet::column::page::Page::DictionaryPage {
+            buf: dict,
+            num_values: dict_encoder.num_entries() as u32,
+            encoding: Encoding::RLE_DICTIONARY,
+            is_sorted: false,
+        };
+        column_chunk_pages.push_front(dict_page);
+        pages.push(column_chunk_pages.into());
+    }
+
+    InMemoryPageIterator::new(schema, column_desc, pages)
+}
+
+fn bench_array_reader(mut array_reader: impl ArrayReader) -> usize {
+    // test procedure: read data in batches of 8192 until no more data
+    let mut total_count = 0;
+    loop {
+        let array = array_reader.next_batch(BATCH_SIZE);
+        let array_len = array.unwrap().len();
+        total_count += array_len;
+        if array_len < BATCH_SIZE {
+            break;
+        }
+    }
+    total_count
+}
+
+fn create_int32_arrow_array_reader(
+    page_iterator: impl PageIterator + 'static,
+    column_desc: ColumnDescPtr,
+) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{ArrowArrayReader, PrimitiveArrayConverter};
+    let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+    ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap()
+}
+
+fn create_int32_primitive_array_reader(
+    page_iterator: impl PageIterator + 'static,
+    column_desc: ColumnDescPtr,
+) -> impl ArrayReader {
+    use parquet::arrow::array_reader::PrimitiveArrayReader;
+    PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
+        .unwrap()
+}
+
+fn create_string_arrow_array_reader(
+    page_iterator: impl PageIterator + 'static,
+    column_desc: ColumnDescPtr,
+) -> impl ArrayReader {
+    use parquet::arrow::arrow_array_reader::{ArrowArrayReader, StringArrayConverter};
+    let converter = StringArrayConverter::new();
+    ArrowArrayReader::try_new(page_iterator, column_desc, converter, None).unwrap()
+}
+
+fn create_string_complex_array_reader(
+    page_iterator: impl PageIterator + 'static,
+    column_desc: ColumnDescPtr,
+) -> impl ArrayReader {
+    use parquet::arrow::array_reader::ComplexObjectArrayReader;
+    use parquet::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
+    let converter = Utf8Converter::new(Utf8ArrayConverter {});
+    ComplexObjectArrayReader::<parquet::data_type::ByteArrayType, Utf8Converter>::new(
+        Box::new(page_iterator),
+        column_desc,
+        converter,
+        None,
+    )
+    .unwrap()
+}
+
+fn add_benches(c: &mut Criterion) {
+    const EXPECTED_VALUE_COUNT: usize =
+        NUM_ROW_GROUPS * PAGES_PER_GROUP * VALUES_PER_PAGE;
+    let mut group = c.benchmark_group("arrow_array_reader");
+
+    let mut count: usize = 0;
+
+    let schema = build_test_schema();
+    let mandatory_int32_column_desc = schema.column(0);
+    let optional_int32_column_desc = schema.column(1);
+    let mandatory_string_column_desc = schema.column(2);
+    // println!("mandatory_string_column_desc: {:?}", mandatory_string_column_desc);
+    let optional_string_column_desc = schema.column(3);
+    // println!("optional_string_column_desc: {:?}", optional_string_column_desc);
+
+    // primitive / int32 benchmarks
+    // =============================
+
+    // int32, plain encoded, no NULLs
+    let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(
+        schema.clone(),
+        mandatory_int32_column_desc.clone(),
+        0.0,
+    );
+    group.bench_function(
+        "read Int32Array, plain encoded, mandatory, no NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_int32_primitive_array_reader(
+                    plain_int32_no_null_data.clone(),
+                    mandatory_int32_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.bench_function(
+        "read Int32Array, plain encoded, mandatory, no NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_int32_arrow_array_reader(
+                    plain_int32_no_null_data.clone(),
+                    mandatory_int32_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    let plain_int32_no_null_data = build_plain_encoded_int32_page_iterator(
+        schema.clone(),
+        optional_int32_column_desc.clone(),
+        0.0,
+    );
+    group.bench_function(
+        "read Int32Array, plain encoded, optional, no NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_int32_primitive_array_reader(
+                    plain_int32_no_null_data.clone(),
+                    optional_int32_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.bench_function(
+        "read Int32Array, plain encoded, optional, no NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_int32_arrow_array_reader(
+                    plain_int32_no_null_data.clone(),
+                    optional_int32_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    // int32, plain encoded, half NULLs
+    let plain_int32_half_null_data = build_plain_encoded_int32_page_iterator(
+        schema.clone(),
+        optional_int32_column_desc.clone(),
+        0.5,
+    );
+    group.bench_function(
+        "read Int32Array, plain encoded, optional, half NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_int32_primitive_array_reader(
+                    plain_int32_half_null_data.clone(),
+                    optional_int32_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.bench_function(
+        "read Int32Array, plain encoded, optional, half NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_int32_arrow_array_reader(
+                    plain_int32_half_null_data.clone(),
+                    optional_int32_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    // int32, dictionary encoded, no NULLs
+    let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator(
+        schema.clone(),
+        mandatory_int32_column_desc.clone(),
+        0.0,
+    );
+    group.bench_function(
+        "read Int32Array, dictionary encoded, mandatory, no NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_int32_primitive_array_reader(
+                    dictionary_int32_no_null_data.clone(),
+                    mandatory_int32_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.bench_function(
+        "read Int32Array, dictionary encoded, mandatory, no NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_int32_arrow_array_reader(
+                    dictionary_int32_no_null_data.clone(),
+                    mandatory_int32_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    let dictionary_int32_no_null_data = build_dictionary_encoded_int32_page_iterator(
+        schema.clone(),
+        optional_int32_column_desc.clone(),
+        0.0,
+    );
+    group.bench_function(
+        "read Int32Array, dictionary encoded, optional, no NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_int32_primitive_array_reader(
+                    dictionary_int32_no_null_data.clone(),
+                    optional_int32_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.bench_function(
+        "read Int32Array, dictionary encoded, optional, no NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_int32_arrow_array_reader(
+                    dictionary_int32_no_null_data.clone(),
+                    optional_int32_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    // int32, dictionary encoded, half NULLs
+    let dictionary_int32_half_null_data = build_dictionary_encoded_int32_page_iterator(
+        schema.clone(),
+        optional_int32_column_desc.clone(),
+        0.5,
+    );
+    group.bench_function(
+        "read Int32Array, dictionary encoded, optional, half NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_int32_primitive_array_reader(
+                    dictionary_int32_half_null_data.clone(),
+                    optional_int32_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.bench_function(
+        "read Int32Array, dictionary encoded, optional, half NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_int32_arrow_array_reader(
+                    dictionary_int32_half_null_data.clone(),
+                    optional_int32_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    // string benchmarks
+    //==============================
+
+    // string, plain encoded, no NULLs
+    let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
+        schema.clone(),
+        mandatory_string_column_desc.clone(),
+        0.0,
+    );
+    group.bench_function(
+        "read StringArray, plain encoded, mandatory, no NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_complex_array_reader(
+                    plain_string_no_null_data.clone(),
+                    mandatory_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.bench_function(
+        "read StringArray, plain encoded, mandatory, no NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_arrow_array_reader(
+                    plain_string_no_null_data.clone(),
+                    mandatory_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    let plain_string_no_null_data = build_plain_encoded_string_page_iterator(
+        schema.clone(),
+        optional_string_column_desc.clone(),
+        0.0,
+    );
+    group.bench_function(
+        "read StringArray, plain encoded, optional, no NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_complex_array_reader(
+                    plain_string_no_null_data.clone(),
+                    optional_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.bench_function(
+        "read StringArray, plain encoded, optional, no NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_arrow_array_reader(
+                    plain_string_no_null_data.clone(),
+                    optional_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    // string, plain encoded, half NULLs
+    let plain_string_half_null_data = build_plain_encoded_string_page_iterator(
+        schema.clone(),
+        optional_string_column_desc.clone(),
+        0.5,
+    );
+    group.bench_function(
+        "read StringArray, plain encoded, optional, half NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_complex_array_reader(
+                    plain_string_half_null_data.clone(),
+                    optional_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.bench_function(
+        "read StringArray, plain encoded, optional, half NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_arrow_array_reader(
+                    plain_string_half_null_data.clone(),
+                    optional_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    // string, dictionary encoded, no NULLs
+    let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator(
+        schema.clone(),
+        mandatory_string_column_desc.clone(),
+        0.0,
+    );
+    group.bench_function(
+        "read StringArray, dictionary encoded, mandatory, no NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_complex_array_reader(
+                    dictionary_string_no_null_data.clone(),
+                    mandatory_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.bench_function(
+        "read StringArray, dictionary encoded, mandatory, no NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_arrow_array_reader(
+                    dictionary_string_no_null_data.clone(),
+                    mandatory_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    let dictionary_string_no_null_data = build_dictionary_encoded_string_page_iterator(
+        schema.clone(),
+        optional_string_column_desc.clone(),
+        0.0,
+    );
+    group.bench_function(
+        "read StringArray, dictionary encoded, optional, no NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_complex_array_reader(
+                    dictionary_string_no_null_data.clone(),
+                    optional_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.bench_function(
+        "read StringArray, dictionary encoded, optional, no NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_arrow_array_reader(
+                    dictionary_string_no_null_data.clone(),
+                    optional_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    // string, dictionary encoded, half NULLs
+    let dictionary_string_half_null_data = build_dictionary_encoded_string_page_iterator(
+        schema,
+        optional_string_column_desc.clone(),
+        0.5,
+    );
+    group.bench_function(
+        "read StringArray, dictionary encoded, optional, half NULLs - old",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_complex_array_reader(
+                    dictionary_string_half_null_data.clone(),
+                    optional_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.bench_function(
+        "read StringArray, dictionary encoded, optional, half NULLs - new",
+        |b| {
+            b.iter(|| {
+                let array_reader = create_string_arrow_array_reader(
+                    dictionary_string_half_null_data.clone(),
+                    optional_string_column_desc.clone(),
+                );
+                count = bench_array_reader(array_reader);
+            })
+        },
+    );
+    assert_eq!(count, EXPECTED_VALUE_COUNT);
+
+    group.finish();
+}
+
+criterion_group!(benches, add_benches);
+criterion_main!(benches);
diff --git a/parquet/src/arrow/array_reader.rs b/parquet/src/arrow/array_reader.rs
index f54e446..bd57cf3 100644
--- a/parquet/src/arrow/array_reader.rs
+++ b/parquet/src/arrow/array_reader.rs
@@ -60,7 +60,7 @@
     Int96ArrayConverter, Int96Converter, IntervalDayTimeArrayConverter,
     IntervalDayTimeConverter, IntervalYearMonthArrayConverter,
     IntervalYearMonthConverter, LargeBinaryArrayConverter, LargeBinaryConverter,
-    LargeUtf8ArrayConverter, LargeUtf8Converter, Utf8ArrayConverter, Utf8Converter,
+    LargeUtf8ArrayConverter, LargeUtf8Converter,
 };
 use crate::arrow::record_reader::RecordReader;
 use crate::arrow::schema::parquet_to_arrow_field;
@@ -570,7 +570,7 @@
     T: DataType,
     C: Converter<Vec<Option<T::T>>, ArrayRef> + 'static,
 {
-    fn new(
+    pub fn new(
         pages: Box<dyn PageIterator>,
         column_desc: ColumnDescPtr,
         converter: C,
@@ -1499,12 +1499,12 @@
                             arrow_type,
                         )?))
                     } else {
-                        let converter = Utf8Converter::new(Utf8ArrayConverter {});
-                        Ok(Box::new(ComplexObjectArrayReader::<
-                            ByteArrayType,
-                            Utf8Converter,
-                        >::new(
-                            page_iterator,
+                        use crate::arrow::arrow_array_reader::{
+                            ArrowArrayReader, StringArrayConverter,
+                        };
+                        let converter = StringArrayConverter::new();
+                        Ok(Box::new(ArrowArrayReader::try_new(
+                            *page_iterator,
                             column_desc,
                             converter,
                             arrow_type,
@@ -1728,7 +1728,7 @@
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::arrow::converter::Utf8Converter;
+    use crate::arrow::converter::{Utf8ArrayConverter, Utf8Converter};
     use crate::arrow::schema::parquet_to_arrow_schema;
     use crate::basic::{Encoding, Type as PhysicalType};
     use crate::column::page::{Page, PageReader};
diff --git a/parquet/src/arrow/arrow_array_reader.rs b/parquet/src/arrow/arrow_array_reader.rs
new file mode 100644
index 0000000..c06d872
--- /dev/null
+++ b/parquet/src/arrow/arrow_array_reader.rs
@@ -0,0 +1,1562 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use super::array_reader::ArrayReader;
+use crate::arrow::schema::parquet_to_arrow_field;
+use crate::basic::Encoding;
+use crate::errors::{ParquetError, Result};
+use crate::{
+    column::page::{Page, PageIterator},
+    memory::ByteBufferPtr,
+    schema::types::{ColumnDescPtr, ColumnDescriptor},
+};
+use arrow::{
+    array::{ArrayRef, Int16Array},
+    buffer::MutableBuffer,
+    datatypes::{DataType as ArrowType, ToByteSlice},
+};
+use std::{any::Any, collections::VecDeque, marker::PhantomData};
+use std::{cell::RefCell, rc::Rc};
+
+struct UnzipIter<Source, Target, State> {
+    shared_state: Rc<RefCell<State>>,
+    select_item_buffer: fn(&mut State) -> &mut VecDeque<Target>,
+    consume_source_item: fn(source_item: Source, state: &mut State) -> Target,
+}
+
+impl<Source, Target, State> UnzipIter<Source, Target, State> {
+    fn new(
+        shared_state: Rc<RefCell<State>>,
+        item_buffer_selector: fn(&mut State) -> &mut VecDeque<Target>,
+        source_item_consumer: fn(source_item: Source, state: &mut State) -> Target,
+    ) -> Self {
+        Self {
+            shared_state,
+            select_item_buffer: item_buffer_selector,
+            consume_source_item: source_item_consumer,
+        }
+    }
+}
+
+trait UnzipIterState<T> {
+    type SourceIter: Iterator<Item = T>;
+    fn source_iter(&mut self) -> &mut Self::SourceIter;
+}
+
+impl<Source, Target, State: UnzipIterState<Source>> Iterator
+    for UnzipIter<Source, Target, State>
+{
+    type Item = Target;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let mut inner = self.shared_state.borrow_mut();
+        // try to get one from the stored data
+        (self.select_item_buffer)(&mut *inner)
+            .pop_front()
+            .or_else(||
+            // nothing stored, we need a new element.
+            inner.source_iter().next().map(|s| {
+                (self.consume_source_item)(s, &mut inner)
+            }))
+    }
+}
+
+struct PageBufferUnzipIterState<V, L, It> {
+    iter: It,
+    value_iter_buffer: VecDeque<V>,
+    def_level_iter_buffer: VecDeque<L>,
+    rep_level_iter_buffer: VecDeque<L>,
+}
+
+impl<V, L, It: Iterator<Item = (V, L, L)>> UnzipIterState<(V, L, L)>
+    for PageBufferUnzipIterState<V, L, It>
+{
+    type SourceIter = It;
+
+    #[inline]
+    fn source_iter(&mut self) -> &mut Self::SourceIter {
+        &mut self.iter
+    }
+}
+
+type ValueUnzipIter<V, L, It> =
+    UnzipIter<(V, L, L), V, PageBufferUnzipIterState<V, L, It>>;
+type LevelUnzipIter<V, L, It> =
+    UnzipIter<(V, L, L), L, PageBufferUnzipIterState<V, L, It>>;
+type PageUnzipResult<V, L, It> = (
+    ValueUnzipIter<V, L, It>,
+    LevelUnzipIter<V, L, It>,
+    LevelUnzipIter<V, L, It>,
+);
+
+fn unzip_iter<V, L, It: Iterator<Item = (V, L, L)>>(it: It) -> PageUnzipResult<V, L, It> {
+    let shared_data = Rc::new(RefCell::new(PageBufferUnzipIterState {
+        iter: it,
+        value_iter_buffer: VecDeque::new(),
+        def_level_iter_buffer: VecDeque::new(),
+        rep_level_iter_buffer: VecDeque::new(),
+    }));
+
+    let value_iter = UnzipIter::new(
+        shared_data.clone(),
+        |state| &mut state.value_iter_buffer,
+        |(v, d, r), state| {
+            state.def_level_iter_buffer.push_back(d);
+            state.rep_level_iter_buffer.push_back(r);
+            v
+        },
+    );
+
+    let def_level_iter = UnzipIter::new(
+        shared_data.clone(),
+        |state| &mut state.def_level_iter_buffer,
+        |(v, d, r), state| {
+            state.value_iter_buffer.push_back(v);
+            state.rep_level_iter_buffer.push_back(r);
+            d
+        },
+    );
+
+    let rep_level_iter = UnzipIter::new(
+        shared_data,
+        |state| &mut state.rep_level_iter_buffer,
+        |(v, d, r), state| {
+            state.value_iter_buffer.push_back(v);
+            state.def_level_iter_buffer.push_back(d);
+            r
+        },
+    );
+
+    (value_iter, def_level_iter, rep_level_iter)
+}
+
+pub trait ArrayConverter {
+    fn convert_value_bytes(
+        &self,
+        value_decoder: &mut impl ValueDecoder,
+        num_values: usize,
+    ) -> Result<arrow::array::ArrayData>;
+}
+
+pub struct ArrowArrayReader<'a, C: ArrayConverter + 'a> {
+    column_desc: ColumnDescPtr,
+    data_type: ArrowType,
+    def_level_decoder: Box<dyn ValueDecoder + 'a>,
+    rep_level_decoder: Box<dyn ValueDecoder + 'a>,
+    value_decoder: Box<dyn ValueDecoder + 'a>,
+    last_def_levels: Option<Int16Array>,
+    last_rep_levels: Option<Int16Array>,
+    array_converter: C,
+}
+
+pub(crate) struct ColumnChunkContext {
+    dictionary_values: Option<Vec<ByteBufferPtr>>,
+}
+
+impl ColumnChunkContext {
+    fn new() -> Self {
+        Self {
+            dictionary_values: None,
+        }
+    }
+
+    fn set_dictionary(&mut self, dictionary_values: Vec<ByteBufferPtr>) {
+        self.dictionary_values = Some(dictionary_values);
+    }
+}
+
+type PageDecoderTuple = (
+    Box<dyn ValueDecoder>,
+    Box<dyn ValueDecoder>,
+    Box<dyn ValueDecoder>,
+);
+
+impl<'a, C: ArrayConverter + 'a> ArrowArrayReader<'a, C> {
+    pub fn try_new<P: PageIterator + 'a>(
+        column_chunk_iterator: P,
+        column_desc: ColumnDescPtr,
+        array_converter: C,
+        arrow_type: Option<ArrowType>,
+    ) -> Result<Self> {
+        let data_type = match arrow_type {
+            Some(t) => t,
+            None => parquet_to_arrow_field(column_desc.as_ref())?
+                .data_type()
+                .clone(),
+        };
+        type PageIteratorItem = Result<(Page, Rc<RefCell<ColumnChunkContext>>)>;
+        let page_iter = column_chunk_iterator
+            // build iterator of pages across column chunks
+            .flat_map(|x| -> Box<dyn Iterator<Item = PageIteratorItem>> {
+                // attach column chunk context
+                let context = Rc::new(RefCell::new(ColumnChunkContext::new()));
+                match x {
+                    Ok(page_reader) => Box::new(
+                        page_reader.map(move |pr| pr.map(|p| (p, context.clone()))),
+                    ),
+                    // errors from reading column chunks / row groups are propagated to page level
+                    Err(e) => Box::new(std::iter::once(Err(e))),
+                }
+            });
+        // capture a clone of column_desc in closure so that it can outlive current function
+        let map_page_fn_factory = |column_desc: ColumnDescPtr| {
+            move |x: Result<(Page, Rc<RefCell<ColumnChunkContext>>)>| {
+                x.and_then(|(page, context)| {
+                    Self::map_page(page, context, column_desc.as_ref())
+                })
+            }
+        };
+        let map_page_fn = map_page_fn_factory(column_desc.clone());
+        // map page iterator into tuple of buffer iterators for (values, def levels, rep levels)
+        // errors from lower levels are surfaced through the value decoder iterator
+        let decoder_iter = page_iter.map(map_page_fn).map(|x| match x {
+            Ok(iter_tuple) => iter_tuple,
+            // errors from reading pages are propagated to decoder iterator level
+            Err(e) => Self::map_page_error(e),
+        });
+        // split tuple iterator into separate iterators for (values, def levels, rep levels)
+        let (value_iter, def_level_iter, rep_level_iter) = unzip_iter(decoder_iter);
+
+        Ok(Self {
+            column_desc,
+            data_type,
+            def_level_decoder: Box::new(CompositeValueDecoder::new(def_level_iter)),
+            rep_level_decoder: Box::new(CompositeValueDecoder::new(rep_level_iter)),
+            value_decoder: Box::new(CompositeValueDecoder::new(value_iter)),
+            last_def_levels: None,
+            last_rep_levels: None,
+            array_converter,
+        })
+    }
+
+    #[inline]
+    fn def_levels_available(column_desc: &ColumnDescriptor) -> bool {
+        column_desc.max_def_level() > 0
+    }
+
+    #[inline]
+    fn rep_levels_available(column_desc: &ColumnDescriptor) -> bool {
+        column_desc.max_rep_level() > 0
+    }
+
+    fn map_page_error(err: ParquetError) -> PageDecoderTuple {
+        (
+            Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
+            Box::new(<dyn ValueDecoder>::once(Err(err.clone()))),
+            Box::new(<dyn ValueDecoder>::once(Err(err))),
+        )
+    }
+
+    // Split Result<Page> into Result<(Iterator<Values>, Iterator<DefLevels>, Iterator<RepLevels>)>
+    // this method could fail, e.g. if the page encoding is not supported
+    fn map_page(
+        page: Page,
+        column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
+        column_desc: &ColumnDescriptor,
+    ) -> Result<PageDecoderTuple> {
+        use crate::encodings::levels::LevelDecoder;
+        match page {
+            Page::DictionaryPage {
+                buf,
+                num_values,
+                encoding,
+                ..
+            } => {
+                let mut column_chunk_context = column_chunk_context.borrow_mut();
+                if column_chunk_context.dictionary_values.is_some() {
+                    return Err(general_err!(
+                        "Column chunk cannot have more than one dictionary"
+                    ));
+                }
+                // create plain decoder for dictionary values
+                let mut dict_decoder = Self::get_dictionary_page_decoder(
+                    buf,
+                    num_values as usize,
+                    encoding,
+                    column_desc,
+                )?;
+                // decode and cache dictionary values
+                let dictionary_values = dict_decoder.read_dictionary_values()?;
+                column_chunk_context.set_dictionary(dictionary_values);
+
+                // a dictionary page doesn't return any values
+                Ok((
+                    Box::new(<dyn ValueDecoder>::empty()),
+                    Box::new(<dyn ValueDecoder>::empty()),
+                    Box::new(<dyn ValueDecoder>::empty()),
+                ))
+            }
+            Page::DataPage {
+                buf,
+                num_values,
+                encoding,
+                def_level_encoding,
+                rep_level_encoding,
+                statistics: _,
+            } => {
+                let mut buffer_ptr = buf;
+                // create rep level decoder iterator
+                let rep_level_iter: Box<dyn ValueDecoder> =
+                    if Self::rep_levels_available(&column_desc) {
+                        let mut rep_decoder = LevelDecoder::v1(
+                            rep_level_encoding,
+                            column_desc.max_rep_level(),
+                        );
+                        let rep_level_byte_len =
+                            rep_decoder.set_data(num_values as usize, buffer_ptr.all());
+                        // advance buffer pointer
+                        buffer_ptr = buffer_ptr.start_from(rep_level_byte_len);
+                        Box::new(LevelValueDecoder::new(rep_decoder))
+                    } else {
+                        Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(
+                            "rep levels are not available".to_string(),
+                        ))))
+                    };
+                // create def level decoder iterator
+                let def_level_iter: Box<dyn ValueDecoder> =
+                    if Self::def_levels_available(&column_desc) {
+                        let mut def_decoder = LevelDecoder::v1(
+                            def_level_encoding,
+                            column_desc.max_def_level(),
+                        );
+                        let def_levels_byte_len =
+                            def_decoder.set_data(num_values as usize, buffer_ptr.all());
+                        // advance buffer pointer
+                        buffer_ptr = buffer_ptr.start_from(def_levels_byte_len);
+                        Box::new(LevelValueDecoder::new(def_decoder))
+                    } else {
+                        Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(
+                            "def levels are not available".to_string(),
+                        ))))
+                    };
+                // create value decoder iterator
+                let value_iter = Self::get_value_decoder(
+                    buffer_ptr,
+                    num_values as usize,
+                    encoding,
+                    column_desc,
+                    column_chunk_context,
+                )?;
+                Ok((value_iter, def_level_iter, rep_level_iter))
+            }
+            Page::DataPageV2 {
+                buf,
+                num_values,
+                encoding,
+                num_nulls: _,
+                num_rows: _,
+                def_levels_byte_len,
+                rep_levels_byte_len,
+                is_compressed: _,
+                statistics: _,
+            } => {
+                let mut offset = 0;
+                // create rep level decoder iterator
+                let rep_level_iter: Box<dyn ValueDecoder> =
+                    if Self::rep_levels_available(&column_desc) {
+                        let rep_levels_byte_len = rep_levels_byte_len as usize;
+                        let mut rep_decoder =
+                            LevelDecoder::v2(column_desc.max_rep_level());
+                        rep_decoder.set_data_range(
+                            num_values as usize,
+                            &buf,
+                            offset,
+                            rep_levels_byte_len,
+                        );
+                        offset += rep_levels_byte_len;
+                        Box::new(LevelValueDecoder::new(rep_decoder))
+                    } else {
+                        Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(
+                            "rep levels are not available".to_string(),
+                        ))))
+                    };
+                // create def level decoder iterator
+                let def_level_iter: Box<dyn ValueDecoder> =
+                    if Self::def_levels_available(&column_desc) {
+                        let def_levels_byte_len = def_levels_byte_len as usize;
+                        let mut def_decoder =
+                            LevelDecoder::v2(column_desc.max_def_level());
+                        def_decoder.set_data_range(
+                            num_values as usize,
+                            &buf,
+                            offset,
+                            def_levels_byte_len,
+                        );
+                        offset += def_levels_byte_len;
+                        Box::new(LevelValueDecoder::new(def_decoder))
+                    } else {
+                        Box::new(<dyn ValueDecoder>::once(Err(ParquetError::General(
+                            "def levels are not available".to_string(),
+                        ))))
+                    };
+
+                // create value decoder iterator
+                let values_buffer = buf.start_from(offset);
+                let value_iter = Self::get_value_decoder(
+                    values_buffer,
+                    num_values as usize,
+                    encoding,
+                    column_desc,
+                    column_chunk_context,
+                )?;
+                Ok((value_iter, def_level_iter, rep_level_iter))
+            }
+        }
+    }
+
+    fn get_dictionary_page_decoder(
+        values_buffer: ByteBufferPtr,
+        num_values: usize,
+        mut encoding: Encoding,
+        column_desc: &ColumnDescriptor,
+    ) -> Result<Box<dyn DictionaryValueDecoder>> {
+        if encoding == Encoding::PLAIN || encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY
+        }
+
+        if encoding == Encoding::RLE_DICTIONARY {
+            Ok(
+                Self::get_plain_value_decoder(values_buffer, num_values, column_desc)
+                    .into_dictionary_decoder(),
+            )
+        } else {
+            Err(nyi_err!(
+                "Invalid/Unsupported encoding type for dictionary: {}",
+                encoding
+            ))
+        }
+    }
+
+    fn get_value_decoder(
+        values_buffer: ByteBufferPtr,
+        num_values: usize,
+        mut encoding: Encoding,
+        column_desc: &ColumnDescriptor,
+        column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
+    ) -> Result<Box<dyn ValueDecoder>> {
+        if encoding == Encoding::PLAIN_DICTIONARY {
+            encoding = Encoding::RLE_DICTIONARY;
+        }
+
+        match encoding {
+            Encoding::PLAIN => {
+                Ok(
+                    Self::get_plain_value_decoder(values_buffer, num_values, column_desc)
+                        .into_value_decoder(),
+                )
+            }
+            Encoding::RLE_DICTIONARY => {
+                if column_chunk_context.borrow().dictionary_values.is_some() {
+                    let value_bit_len = Self::get_column_physical_bit_len(column_desc);
+                    let dictionary_decoder: Box<dyn ValueDecoder> = if value_bit_len == 0
+                    {
+                        Box::new(VariableLenDictionaryDecoder::new(
+                            column_chunk_context,
+                            values_buffer,
+                            num_values,
+                        ))
+                    } else {
+                        Box::new(FixedLenDictionaryDecoder::new(
+                            column_chunk_context,
+                            values_buffer,
+                            num_values,
+                            value_bit_len,
+                        ))
+                    };
+                    Ok(dictionary_decoder)
+                } else {
+                    Err(general_err!("Dictionary values have not been initialized."))
+                }
+            }
+            // Encoding::RLE => Box::new(RleValueDecoder::new()),
+            // Encoding::DELTA_BINARY_PACKED => Box::new(DeltaBitPackDecoder::new()),
+            // Encoding::DELTA_LENGTH_BYTE_ARRAY => Box::new(DeltaLengthByteArrayDecoder::new()),
+            // Encoding::DELTA_BYTE_ARRAY => Box::new(DeltaByteArrayDecoder::new()),
+            e => return Err(nyi_err!("Encoding {} is not supported", e)),
+        }
+    }
+
+    fn get_column_physical_bit_len(column_desc: &ColumnDescriptor) -> usize {
+        use crate::basic::Type as PhysicalType;
+        // parquet only supports a limited number of physical types
+        // later converters cast to a more specific arrow / logical type if necessary
+        match column_desc.physical_type() {
+            PhysicalType::BOOLEAN => 1,
+            PhysicalType::INT32 | PhysicalType::FLOAT => 32,
+            PhysicalType::INT64 | PhysicalType::DOUBLE => 64,
+            PhysicalType::INT96 => 96,
+            PhysicalType::BYTE_ARRAY => 0,
+            PhysicalType::FIXED_LEN_BYTE_ARRAY => column_desc.type_length() as usize * 8,
+        }
+    }
+
+    fn get_plain_value_decoder(
+        values_buffer: ByteBufferPtr,
+        num_values: usize,
+        column_desc: &ColumnDescriptor,
+    ) -> Box<dyn PlainValueDecoder> {
+        let value_bit_len = Self::get_column_physical_bit_len(column_desc);
+        if value_bit_len == 0 {
+            Box::new(VariableLenPlainDecoder::new(values_buffer, num_values))
+        } else {
+            Box::new(FixedLenPlainDecoder::new(
+                values_buffer,
+                num_values,
+                value_bit_len,
+            ))
+        }
+    }
+
+    fn build_level_array(
+        level_decoder: &mut impl ValueDecoder,
+        batch_size: usize,
+    ) -> Result<Int16Array> {
+        use arrow::datatypes::Int16Type;
+        let level_converter = PrimitiveArrayConverter::<Int16Type>::new();
+        let array_data =
+            level_converter.convert_value_bytes(level_decoder, batch_size)?;
+        Ok(Int16Array::from(array_data))
+    }
+}
+
+impl<C: ArrayConverter> ArrayReader for ArrowArrayReader<'static, C> {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn get_data_type(&self) -> &ArrowType {
+        &self.data_type
+    }
+
+    fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
+        if Self::rep_levels_available(&self.column_desc) {
+            // read rep levels if available
+            let rep_level_array =
+                Self::build_level_array(&mut self.rep_level_decoder, batch_size)?;
+            self.last_rep_levels = Some(rep_level_array);
+        }
+
+        // check if def levels are available
+        let (values_to_read, null_bitmap_array) =
+            if !Self::def_levels_available(&self.column_desc) {
+                // if no def levels - just read (up to) batch_size values
+                (batch_size, None)
+            } else {
+                // if def levels are available - they determine how many values will be read
+                // decode def levels, return first error if any
+                let def_level_array =
+                    Self::build_level_array(&mut self.def_level_decoder, batch_size)?;
+                let def_level_count = def_level_array.len();
+                // use eq_scalar to efficiently build null bitmap array from def levels
+                let null_bitmap_array = arrow::compute::eq_scalar(
+                    &def_level_array,
+                    self.column_desc.max_def_level(),
+                )?;
+                self.last_def_levels = Some(def_level_array);
+                // efficiently calculate values to read
+                let values_to_read = null_bitmap_array
+                    .values()
+                    .count_set_bits_offset(0, def_level_count);
+                let maybe_null_bitmap = if values_to_read != null_bitmap_array.len() {
+                    Some(null_bitmap_array)
+                } else {
+                    // shortcut if no NULLs
+                    None
+                };
+                (values_to_read, maybe_null_bitmap)
+            };
+
+        // read a batch of values
+        // converter only creates a no-null / all value array data
+        let mut value_array_data = self
+            .array_converter
+            .convert_value_bytes(&mut self.value_decoder, values_to_read)?;
+
+        if let Some(null_bitmap_array) = null_bitmap_array {
+            // Only if def levels are available - insert null values efficiently using MutableArrayData.
+            // This will require value bytes to be copied again, but converter requirements are reduced.
+            // With a small number of NULLs, this will only be a few copies of large byte sequences.
+            let actual_batch_size = null_bitmap_array.len();
+            // use_nulls is false, because null_bitmap_array is already calculated and re-used
+            let mut mutable = arrow::array::MutableArrayData::new(
+                vec![&value_array_data],
+                false,
+                actual_batch_size,
+            );
+            // SlicesIterator slices only the true values, NULLs are inserted to fill any gaps
+            arrow::compute::SlicesIterator::new(&null_bitmap_array).for_each(
+                |(start, end)| {
+                    // the gap needs to be filled with NULLs
+                    if start > mutable.len() {
+                        let nulls_to_add = start - mutable.len();
+                        mutable.extend_nulls(nulls_to_add);
+                    }
+                    // fill values, adjust start and end with NULL count so far
+                    let nulls_added = mutable.null_count();
+                    mutable.extend(0, start - nulls_added, end - nulls_added);
+                },
+            );
+            // any remaining part is NULLs
+            if mutable.len() < actual_batch_size {
+                let nulls_to_add = actual_batch_size - mutable.len();
+                mutable.extend_nulls(nulls_to_add);
+            }
+
+            value_array_data = mutable
+                .into_builder()
+                .null_bit_buffer(null_bitmap_array.values().clone())
+                .build();
+        }
+        let mut array = arrow::array::make_array(value_array_data);
+        if array.data_type() != &self.data_type {
+            // cast array to self.data_type if necessary
+            array = arrow::compute::cast(&array, &self.data_type)?
+        }
+        Ok(array)
+    }
+
+    fn get_def_levels(&self) -> Option<&[i16]> {
+        self.last_def_levels.as_ref().map(|x| x.values())
+    }
+
+    fn get_rep_levels(&self) -> Option<&[i16]> {
+        self.last_rep_levels.as_ref().map(|x| x.values())
+    }
+}
+
+use crate::encodings::rle::RleDecoder;
+
+pub trait ValueDecoder {
+    fn read_value_bytes(
+        &mut self,
+        num_values: usize,
+        read_bytes: &mut dyn FnMut(&[u8], usize),
+    ) -> Result<usize>;
+}
+
+trait DictionaryValueDecoder {
+    fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>>;
+}
+
+trait PlainValueDecoder: ValueDecoder + DictionaryValueDecoder {
+    fn into_value_decoder(self: Box<Self>) -> Box<dyn ValueDecoder>;
+    fn into_dictionary_decoder(self: Box<Self>) -> Box<dyn DictionaryValueDecoder>;
+}
+
+impl<T> PlainValueDecoder for T
+where
+    T: ValueDecoder + DictionaryValueDecoder + 'static,
+{
+    fn into_value_decoder(self: Box<T>) -> Box<dyn ValueDecoder> {
+        self
+    }
+
+    fn into_dictionary_decoder(self: Box<T>) -> Box<dyn DictionaryValueDecoder> {
+        self
+    }
+}
+
+impl dyn ValueDecoder {
+    fn empty() -> impl ValueDecoder {
+        SingleValueDecoder::new(Ok(0))
+    }
+
+    fn once(value: Result<usize>) -> impl ValueDecoder {
+        SingleValueDecoder::new(value)
+    }
+}
+
+impl ValueDecoder for Box<dyn ValueDecoder> {
+    #[inline]
+    fn read_value_bytes(
+        &mut self,
+        num_values: usize,
+        read_bytes: &mut dyn FnMut(&[u8], usize),
+    ) -> Result<usize> {
+        self.as_mut().read_value_bytes(num_values, read_bytes)
+    }
+}
+
+struct SingleValueDecoder {
+    value: Result<usize>,
+}
+
+impl SingleValueDecoder {
+    fn new(value: Result<usize>) -> Self {
+        Self { value }
+    }
+}
+
+impl ValueDecoder for SingleValueDecoder {
+    fn read_value_bytes(
+        &mut self,
+        _num_values: usize,
+        _read_bytes: &mut dyn FnMut(&[u8], usize),
+    ) -> Result<usize> {
+        self.value.clone()
+    }
+}
+
+struct CompositeValueDecoder<I: Iterator<Item = Box<dyn ValueDecoder>>> {
+    current_decoder: Option<Box<dyn ValueDecoder>>,
+    decoder_iter: I,
+}
+
+impl<I: Iterator<Item = Box<dyn ValueDecoder>>> CompositeValueDecoder<I> {
+    fn new(mut decoder_iter: I) -> Self {
+        let current_decoder = decoder_iter.next();
+        Self {
+            current_decoder,
+            decoder_iter,
+        }
+    }
+}
+
+impl<I: Iterator<Item = Box<dyn ValueDecoder>>> ValueDecoder
+    for CompositeValueDecoder<I>
+{
+    fn read_value_bytes(
+        &mut self,
+        num_values: usize,
+        read_bytes: &mut dyn FnMut(&[u8], usize),
+    ) -> Result<usize> {
+        let mut values_to_read = num_values;
+        while values_to_read > 0 {
+            let value_decoder = match self.current_decoder.as_mut() {
+                Some(d) => d,
+                // no more decoders
+                None => break,
+            };
+            while values_to_read > 0 {
+                let values_read =
+                    value_decoder.read_value_bytes(values_to_read, read_bytes)?;
+                if values_read > 0 {
+                    values_to_read -= values_read;
+                } else {
+                    // no more values in current decoder
+                    self.current_decoder = self.decoder_iter.next();
+                    break;
+                }
+            }
+        }
+
+        Ok(num_values - values_to_read)
+    }
+}
+
+struct LevelValueDecoder {
+    level_decoder: crate::encodings::levels::LevelDecoder,
+    level_value_buffer: Vec<i16>,
+}
+
+impl LevelValueDecoder {
+    fn new(level_decoder: crate::encodings::levels::LevelDecoder) -> Self {
+        Self {
+            level_decoder,
+            level_value_buffer: vec![0i16; 2048],
+        }
+    }
+}
+
+impl ValueDecoder for LevelValueDecoder {
+    fn read_value_bytes(
+        &mut self,
+        num_values: usize,
+        read_bytes: &mut dyn FnMut(&[u8], usize),
+    ) -> Result<usize> {
+        let value_size = std::mem::size_of::<i16>();
+        let mut total_values_read = 0;
+        while total_values_read < num_values {
+            let values_to_read = std::cmp::min(
+                num_values - total_values_read,
+                self.level_value_buffer.len(),
+            );
+            let values_read = match self
+                .level_decoder
+                .get(&mut self.level_value_buffer[..values_to_read])
+            {
+                Ok(values_read) => values_read,
+                Err(e) => return Err(e),
+            };
+            if values_read > 0 {
+                let level_value_bytes =
+                    &self.level_value_buffer.to_byte_slice()[..values_read * value_size];
+                read_bytes(level_value_bytes, values_read);
+                total_values_read += values_read;
+            } else {
+                break;
+            }
+        }
+        Ok(total_values_read)
+    }
+}
+
+pub(crate) struct FixedLenPlainDecoder {
+    data: ByteBufferPtr,
+    num_values: usize,
+    value_bit_len: usize,
+}
+
+impl FixedLenPlainDecoder {
+    pub(crate) fn new(
+        data: ByteBufferPtr,
+        num_values: usize,
+        value_bit_len: usize,
+    ) -> Self {
+        Self {
+            data,
+            num_values,
+            value_bit_len,
+        }
+    }
+}
+
+impl DictionaryValueDecoder for FixedLenPlainDecoder {
+    fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>> {
+        let value_byte_len = self.value_bit_len / 8;
+        let available_values = self.data.len() / value_byte_len;
+        let values_to_read = std::cmp::min(available_values, self.num_values);
+        let byte_len = values_to_read * value_byte_len;
+        let values = vec![self.data.range(0, byte_len)];
+        self.num_values = 0;
+        self.data.set_range(self.data.start(), 0);
+        Ok(values)
+    }
+}
+
+impl ValueDecoder for FixedLenPlainDecoder {
+    fn read_value_bytes(
+        &mut self,
+        num_values: usize,
+        read_bytes: &mut dyn FnMut(&[u8], usize),
+    ) -> Result<usize> {
+        let available_values = self.data.len() * 8 / self.value_bit_len;
+        if available_values > 0 {
+            let values_to_read = std::cmp::min(available_values, num_values);
+            let byte_len = values_to_read * self.value_bit_len / 8;
+            read_bytes(&self.data.data()[..byte_len], values_to_read);
+            self.data
+                .set_range(self.data.start() + byte_len, self.data.len() - byte_len);
+            Ok(values_to_read)
+        } else {
+            Ok(0)
+        }
+    }
+}
+
+pub(crate) struct VariableLenPlainDecoder {
+    data: ByteBufferPtr,
+    num_values: usize,
+    position: usize,
+}
+
+impl VariableLenPlainDecoder {
+    pub(crate) fn new(data: ByteBufferPtr, num_values: usize) -> Self {
+        Self {
+            data,
+            num_values,
+            position: 0,
+        }
+    }
+}
+
+impl DictionaryValueDecoder for VariableLenPlainDecoder {
+    fn read_dictionary_values(&mut self) -> Result<Vec<ByteBufferPtr>> {
+        const LEN_SIZE: usize = std::mem::size_of::<u32>();
+        let data = self.data.data();
+        let data_len = data.len();
+        let values_to_read = self.num_values;
+        let mut values = Vec::with_capacity(values_to_read);
+        let mut values_read = 0;
+        while self.position < data_len && values_read < values_to_read {
+            let len: usize =
+                read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize;
+            self.position += LEN_SIZE;
+            if data_len < self.position + len {
+                return Err(eof_err!("Not enough bytes to decode"));
+            }
+            values.push(self.data.range(self.position, len));
+            self.position += len;
+            values_read += 1;
+        }
+        self.num_values -= values_read;
+        Ok(values)
+    }
+}
+
+impl ValueDecoder for VariableLenPlainDecoder {
+    fn read_value_bytes(
+        &mut self,
+        num_values: usize,
+        read_bytes: &mut dyn FnMut(&[u8], usize),
+    ) -> Result<usize> {
+        const LEN_SIZE: usize = std::mem::size_of::<u32>();
+        let data = self.data.data();
+        let data_len = data.len();
+        let values_to_read = std::cmp::min(self.num_values, num_values);
+        let mut values_read = 0;
+        while self.position < data_len && values_read < values_to_read {
+            let len: usize =
+                read_num_bytes!(u32, LEN_SIZE, data[self.position..]) as usize;
+            self.position += LEN_SIZE;
+            if data_len < self.position + len {
+                return Err(eof_err!("Not enough bytes to decode"));
+            }
+            read_bytes(&data[self.position..][..len], 1);
+            self.position += len;
+            values_read += 1;
+        }
+        self.num_values -= values_read;
+        Ok(values_read)
+    }
+}
+
+pub(crate) struct FixedLenDictionaryDecoder {
+    context_ref: Rc<RefCell<ColumnChunkContext>>,
+    key_data_bufer: ByteBufferPtr,
+    num_values: usize,
+    rle_decoder: RleDecoder,
+    value_byte_len: usize,
+    keys_buffer: Vec<i32>,
+}
+
+impl FixedLenDictionaryDecoder {
+    pub(crate) fn new(
+        column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
+        key_data_bufer: ByteBufferPtr,
+        num_values: usize,
+        value_bit_len: usize,
+    ) -> Self {
+        assert!(
+            value_bit_len % 8 == 0,
+            "value_bit_size must be a multiple of 8"
+        );
+        // First byte in `data` is bit width
+        let bit_width = key_data_bufer.data()[0];
+        let mut rle_decoder = RleDecoder::new(bit_width);
+        rle_decoder.set_data(key_data_bufer.start_from(1));
+
+        Self {
+            context_ref: column_chunk_context,
+            key_data_bufer,
+            num_values,
+            rle_decoder,
+            value_byte_len: value_bit_len / 8,
+            keys_buffer: vec![0; 2048],
+        }
+    }
+}
+
+impl ValueDecoder for FixedLenDictionaryDecoder {
+    fn read_value_bytes(
+        &mut self,
+        num_values: usize,
+        read_bytes: &mut dyn FnMut(&[u8], usize),
+    ) -> Result<usize> {
+        if self.num_values == 0 {
+            return Ok(0);
+        }
+        let context = self.context_ref.borrow();
+        let values = context.dictionary_values.as_ref().unwrap();
+        let input_value_bytes = values[0].data();
+        // read no more than available values or requested values
+        let values_to_read = std::cmp::min(self.num_values, num_values);
+        let mut values_read = 0;
+        while values_read < values_to_read {
+            // read values in batches of up to self.keys_buffer.len()
+            let keys_to_read =
+                std::cmp::min(values_to_read - values_read, self.keys_buffer.len());
+            let keys_read = match self
+                .rle_decoder
+                .get_batch(&mut self.keys_buffer[..keys_to_read])
+            {
+                Ok(keys_read) => keys_read,
+                Err(e) => return Err(e),
+            };
+            if keys_read == 0 {
+                self.num_values = 0;
+                return Ok(values_read);
+            }
+            for i in 0..keys_read {
+                let key = self.keys_buffer[i] as usize;
+                read_bytes(
+                    &input_value_bytes[key * self.value_byte_len..]
+                        [..self.value_byte_len],
+                    1,
+                );
+            }
+            values_read += keys_read;
+        }
+        self.num_values -= values_read;
+        Ok(values_read)
+    }
+}
+
+pub(crate) struct VariableLenDictionaryDecoder {
+    context_ref: Rc<RefCell<ColumnChunkContext>>,
+    key_data_bufer: ByteBufferPtr,
+    num_values: usize,
+    rle_decoder: RleDecoder,
+    keys_buffer: Vec<i32>,
+}
+
+impl VariableLenDictionaryDecoder {
+    pub(crate) fn new(
+        column_chunk_context: Rc<RefCell<ColumnChunkContext>>,
+        key_data_bufer: ByteBufferPtr,
+        num_values: usize,
+    ) -> Self {
+        // First byte in `data` is bit width
+        let bit_width = key_data_bufer.data()[0];
+        let mut rle_decoder = RleDecoder::new(bit_width);
+        rle_decoder.set_data(key_data_bufer.start_from(1));
+
+        Self {
+            context_ref: column_chunk_context,
+            key_data_bufer,
+            num_values,
+            rle_decoder,
+            keys_buffer: vec![0; 2048],
+        }
+    }
+}
+
+impl ValueDecoder for VariableLenDictionaryDecoder {
+    fn read_value_bytes(
+        &mut self,
+        num_values: usize,
+        read_bytes: &mut dyn FnMut(&[u8], usize),
+    ) -> Result<usize> {
+        if self.num_values == 0 {
+            return Ok(0);
+        }
+        let context = self.context_ref.borrow();
+        let values = context.dictionary_values.as_ref().unwrap();
+        let values_to_read = std::cmp::min(self.num_values, num_values);
+        let mut values_read = 0;
+        while values_read < values_to_read {
+            // read values in batches of up to self.keys_buffer.len()
+            let keys_to_read =
+                std::cmp::min(values_to_read - values_read, self.keys_buffer.len());
+            let keys_read = match self
+                .rle_decoder
+                .get_batch(&mut self.keys_buffer[..keys_to_read])
+            {
+                Ok(keys_read) => keys_read,
+                Err(e) => return Err(e),
+            };
+            if keys_read == 0 {
+                self.num_values = 0;
+                return Ok(values_read);
+            }
+            for i in 0..keys_read {
+                let key = self.keys_buffer[i] as usize;
+                read_bytes(values[key].data(), 1);
+            }
+            values_read += keys_read;
+        }
+        self.num_values -= values_read;
+        Ok(values_read)
+    }
+}
+
+use arrow::datatypes::ArrowPrimitiveType;
+
+pub struct PrimitiveArrayConverter<T: ArrowPrimitiveType> {
+    _phantom_data: PhantomData<T>,
+}
+
+impl<T: ArrowPrimitiveType> PrimitiveArrayConverter<T> {
+    pub fn new() -> Self {
+        Self {
+            _phantom_data: PhantomData,
+        }
+    }
+}
+
+impl<T: ArrowPrimitiveType> ArrayConverter for PrimitiveArrayConverter<T> {
+    fn convert_value_bytes(
+        &self,
+        value_decoder: &mut impl ValueDecoder,
+        num_values: usize,
+    ) -> Result<arrow::array::ArrayData> {
+        let value_size = T::get_byte_width();
+        let values_byte_capacity = num_values * value_size;
+        let mut values_buffer = MutableBuffer::new(values_byte_capacity);
+
+        value_decoder.read_value_bytes(num_values, &mut |value_bytes, _| {
+            values_buffer.extend_from_slice(value_bytes);
+        })?;
+
+        // calculate actual data_len, which may be different from the iterator's upper bound
+        let value_count = values_buffer.len() / value_size;
+        let array_data = arrow::array::ArrayData::builder(T::DATA_TYPE)
+            .len(value_count)
+            .add_buffer(values_buffer.into())
+            .build();
+        Ok(array_data)
+    }
+}
+
+pub struct StringArrayConverter {}
+
+impl StringArrayConverter {
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl ArrayConverter for StringArrayConverter {
+    fn convert_value_bytes(
+        &self,
+        value_decoder: &mut impl ValueDecoder,
+        num_values: usize,
+    ) -> Result<arrow::array::ArrayData> {
+        use arrow::datatypes::ArrowNativeType;
+        let offset_size = std::mem::size_of::<i32>();
+        let mut offsets_buffer = MutableBuffer::new((num_values + 1) * offset_size);
+        // allocate initial capacity of 1 byte for each item
+        let values_byte_capacity = num_values;
+        let mut values_buffer = MutableBuffer::new(values_byte_capacity);
+
+        let mut length_so_far = i32::default();
+        offsets_buffer.push(length_so_far);
+
+        value_decoder.read_value_bytes(num_values, &mut |value_bytes, values_read| {
+            debug_assert_eq!(
+                values_read, 1,
+                "offset length value buffers can only contain bytes for a single value"
+            );
+            length_so_far +=
+                <i32 as ArrowNativeType>::from_usize(value_bytes.len()).unwrap();
+            // this should be safe because a ValueDecoder should not read more than num_values
+            unsafe {
+                offsets_buffer.push_unchecked(length_so_far);
+            }
+            values_buffer.extend_from_slice(value_bytes);
+        })?;
+        // calculate actual data_len, which may be different from the iterator's upper bound
+        let data_len = (offsets_buffer.len() / offset_size) - 1;
+        let array_data = arrow::array::ArrayData::builder(ArrowType::Utf8)
+            .len(data_len)
+            .add_buffer(offsets_buffer.into())
+            .add_buffer(values_buffer.into())
+            .build();
+        Ok(array_data)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::column::page::Page;
+    use crate::data_type::ByteArray;
+    use crate::data_type::ByteArrayType;
+    use crate::schema::parser::parse_message_type;
+    use crate::schema::types::SchemaDescriptor;
+    use crate::util::test_common::page_util::{
+        DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
+    };
+    use crate::{
+        basic::Encoding, column::page::PageReader, schema::types::SchemaDescPtr,
+    };
+    use arrow::array::{PrimitiveArray, StringArray};
+    use arrow::datatypes::Int32Type as ArrowInt32;
+    use rand::{distributions::uniform::SampleUniform, thread_rng, Rng};
+    use std::sync::Arc;
+
+    /// Iterator for testing reading empty columns
+    struct EmptyPageIterator {
+        schema: SchemaDescPtr,
+    }
+
+    impl EmptyPageIterator {
+        fn new(schema: SchemaDescPtr) -> Self {
+            EmptyPageIterator { schema }
+        }
+    }
+
+    impl Iterator for EmptyPageIterator {
+        type Item = Result<Box<dyn PageReader>>;
+
+        fn next(&mut self) -> Option<Self::Item> {
+            None
+        }
+    }
+
+    impl PageIterator for EmptyPageIterator {
+        fn schema(&mut self) -> Result<SchemaDescPtr> {
+            Ok(self.schema.clone())
+        }
+
+        fn column_schema(&mut self) -> Result<ColumnDescPtr> {
+            Ok(self.schema.column(0))
+        }
+    }
+
+    #[test]
+    fn test_array_reader_empty_pages() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+          REQUIRED INT32 leaf;
+        }
+        ";
+
+        let schema = parse_message_type(message_type)
+            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+            .unwrap();
+
+        let column_desc = schema.column(0);
+        let page_iterator = EmptyPageIterator::new(schema);
+
+        let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+        let mut array_reader =
+            ArrowArrayReader::try_new(page_iterator, column_desc, converter, None)
+                .unwrap();
+
+        // expect no values to be read
+        let array = array_reader.next_batch(50).unwrap();
+        assert!(array.is_empty());
+    }
+
+    fn make_column_chunks<T: crate::data_type::DataType>(
+        column_desc: ColumnDescPtr,
+        encoding: Encoding,
+        num_levels: usize,
+        min_value: T::T,
+        max_value: T::T,
+        def_levels: &mut Vec<i16>,
+        rep_levels: &mut Vec<i16>,
+        values: &mut Vec<T::T>,
+        page_lists: &mut Vec<Vec<Page>>,
+        use_v2: bool,
+        num_chunks: usize,
+    ) where
+        T::T: PartialOrd + SampleUniform + Copy,
+    {
+        for _i in 0..num_chunks {
+            let mut pages = VecDeque::new();
+            let mut data = Vec::new();
+            let mut page_def_levels = Vec::new();
+            let mut page_rep_levels = Vec::new();
+
+            crate::util::test_common::make_pages::<T>(
+                column_desc.clone(),
+                encoding,
+                1,
+                num_levels,
+                min_value,
+                max_value,
+                &mut page_def_levels,
+                &mut page_rep_levels,
+                &mut data,
+                &mut pages,
+                use_v2,
+            );
+
+            def_levels.append(&mut page_def_levels);
+            rep_levels.append(&mut page_rep_levels);
+            values.append(&mut data);
+            page_lists.push(Vec::from(pages));
+        }
+    }
+
+    #[test]
+    fn test_primitive_array_reader_data() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+          REQUIRED INT32 leaf;
+        }
+        ";
+
+        let schema = parse_message_type(message_type)
+            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+            .unwrap();
+
+        let column_desc = schema.column(0);
+
+        // Construct page iterator
+        {
+            let mut data = Vec::new();
+            let mut page_lists = Vec::new();
+            make_column_chunks::<crate::data_type::Int32Type>(
+                column_desc.clone(),
+                Encoding::PLAIN,
+                100,
+                1,
+                200,
+                &mut Vec::new(),
+                &mut Vec::new(),
+                &mut data,
+                &mut page_lists,
+                true,
+                2,
+            );
+            let page_iterator =
+                InMemoryPageIterator::new(schema, column_desc.clone(), page_lists);
+
+            let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+            let mut array_reader =
+                ArrowArrayReader::try_new(page_iterator, column_desc, converter, None)
+                    .unwrap();
+
+            // Read first 50 values, which are all from the first column chunk
+            let array = array_reader.next_batch(50).unwrap();
+            let array = array
+                .as_any()
+                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+                .unwrap();
+
+            assert_eq!(
+                &PrimitiveArray::<ArrowInt32>::from(data[0..50].to_vec()),
+                array
+            );
+
+            // Read next 100 values, the first 50 ones are from the first column chunk,
+            // and the last 50 ones are from the second column chunk
+            let array = array_reader.next_batch(100).unwrap();
+            let array = array
+                .as_any()
+                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+                .unwrap();
+
+            assert_eq!(
+                &PrimitiveArray::<ArrowInt32>::from(data[50..150].to_vec()),
+                array
+            );
+
+            // Try to read 100 values, however there are only 50 values
+            let array = array_reader.next_batch(100).unwrap();
+            let array = array
+                .as_any()
+                .downcast_ref::<PrimitiveArray<ArrowInt32>>()
+                .unwrap();
+
+            assert_eq!(
+                &PrimitiveArray::<ArrowInt32>::from(data[150..200].to_vec()),
+                array
+            );
+        }
+    }
+
+    #[test]
+    fn test_primitive_array_reader_def_and_rep_levels() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+            REPEATED Group test_mid {
+                OPTIONAL INT32 leaf;
+            }
+        }
+        ";
+
+        let schema = parse_message_type(message_type)
+            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+            .unwrap();
+
+        let column_desc = schema.column(0);
+
+        // Construct page iterator
+        {
+            let mut def_levels = Vec::new();
+            let mut rep_levels = Vec::new();
+            let mut page_lists = Vec::new();
+            make_column_chunks::<crate::data_type::Int32Type>(
+                column_desc.clone(),
+                Encoding::PLAIN,
+                100,
+                1,
+                200,
+                &mut def_levels,
+                &mut rep_levels,
+                &mut Vec::new(),
+                &mut page_lists,
+                true,
+                2,
+            );
+
+            let page_iterator =
+                InMemoryPageIterator::new(schema, column_desc.clone(), page_lists);
+
+            let converter = PrimitiveArrayConverter::<arrow::datatypes::Int32Type>::new();
+            let mut array_reader =
+                ArrowArrayReader::try_new(page_iterator, column_desc, converter, None)
+                    .unwrap();
+
+            let mut accu_len: usize = 0;
+
+            // Read first 50 values, which are all from the first column chunk
+            let array = array_reader.next_batch(50).unwrap();
+            assert_eq!(
+                Some(&def_levels[accu_len..(accu_len + array.len())]),
+                array_reader.get_def_levels()
+            );
+            assert_eq!(
+                Some(&rep_levels[accu_len..(accu_len + array.len())]),
+                array_reader.get_rep_levels()
+            );
+            accu_len += array.len();
+
+            // Read next 100 values, the first 50 ones are from the first column chunk,
+            // and the last 50 ones are from the second column chunk
+            let array = array_reader.next_batch(100).unwrap();
+            assert_eq!(
+                Some(&def_levels[accu_len..(accu_len + array.len())]),
+                array_reader.get_def_levels()
+            );
+            assert_eq!(
+                Some(&rep_levels[accu_len..(accu_len + array.len())]),
+                array_reader.get_rep_levels()
+            );
+            accu_len += array.len();
+
+            // Try to read 100 values, however there are only 50 values
+            let array = array_reader.next_batch(100).unwrap();
+            assert_eq!(
+                Some(&def_levels[accu_len..(accu_len + array.len())]),
+                array_reader.get_def_levels()
+            );
+            assert_eq!(
+                Some(&rep_levels[accu_len..(accu_len + array.len())]),
+                array_reader.get_rep_levels()
+            );
+
+            assert_eq!(accu_len + array.len(), 200);
+        }
+    }
+
+    #[test]
+    fn test_arrow_array_reader_string() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+            REPEATED Group test_mid {
+                OPTIONAL BYTE_ARRAY leaf (UTF8);
+            }
+        }
+        ";
+        let num_pages = 2;
+        let values_per_page = 100;
+        let str_base = "Hello World";
+
+        let schema = parse_message_type(message_type)
+            .map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
+            .unwrap();
+        let column_desc = schema.column(0);
+        let max_def_level = column_desc.max_def_level();
+        let max_rep_level = column_desc.max_rep_level();
+
+        assert_eq!(max_def_level, 2);
+        assert_eq!(max_rep_level, 1);
+
+        let mut rng = thread_rng();
+        let mut pages: Vec<Vec<Page>> = Vec::new();
+
+        let mut rep_levels = Vec::with_capacity(num_pages * values_per_page);
+        let mut def_levels = Vec::with_capacity(num_pages * values_per_page);
+        let mut all_values = Vec::with_capacity(num_pages * values_per_page);
+
+        for i in 0..num_pages {
+            let mut values = Vec::with_capacity(values_per_page);
+
+            for _ in 0..values_per_page {
+                let def_level = rng.gen_range(0..max_def_level + 1);
+                let rep_level = rng.gen_range(0..max_rep_level + 1);
+                if def_level == max_def_level {
+                    let len = rng.gen_range(1..str_base.len());
+                    let slice = &str_base[..len];
+                    values.push(ByteArray::from(slice));
+                    all_values.push(Some(slice.to_string()));
+                } else {
+                    all_values.push(None)
+                }
+                rep_levels.push(rep_level);
+                def_levels.push(def_level)
+            }
+
+            let range = i * values_per_page..(i + 1) * values_per_page;
+            let mut pb =
+                DataPageBuilderImpl::new(column_desc.clone(), values.len() as u32, true);
+
+            pb.add_rep_levels(max_rep_level, &rep_levels.as_slice()[range.clone()]);
+            pb.add_def_levels(max_def_level, &def_levels.as_slice()[range]);
+            pb.add_values::<ByteArrayType>(Encoding::PLAIN, values.as_slice());
+
+            let data_page = pb.consume();
+            pages.push(vec![data_page]);
+        }
+
+        let page_iterator = InMemoryPageIterator::new(schema, column_desc.clone(), pages);
+        let converter = StringArrayConverter::new();
+        let mut array_reader =
+            ArrowArrayReader::try_new(page_iterator, column_desc, converter, None)
+                .unwrap();
+
+        let mut accu_len: usize = 0;
+
+        let array = array_reader.next_batch(values_per_page / 2).unwrap();
+        assert_eq!(array.len(), values_per_page / 2);
+        assert_eq!(
+            Some(&def_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_def_levels()
+        );
+        assert_eq!(
+            Some(&rep_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_rep_levels()
+        );
+        accu_len += array.len();
+
+        // Read next values_per_page values, the first values_per_page/2 ones are from the first column chunk,
+        // and the last values_per_page/2 ones are from the second column chunk
+        let array = array_reader.next_batch(values_per_page).unwrap();
+        assert_eq!(array.len(), values_per_page);
+        assert_eq!(
+            Some(&def_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_def_levels()
+        );
+        assert_eq!(
+            Some(&rep_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_rep_levels()
+        );
+        let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
+        for i in 0..array.len() {
+            if array.is_valid(i) {
+                assert_eq!(
+                    all_values[i + accu_len].as_ref().unwrap().as_str(),
+                    strings.value(i)
+                )
+            } else {
+                assert_eq!(all_values[i + accu_len], None)
+            }
+        }
+        accu_len += array.len();
+
+        // Try to read values_per_page values, however there are only values_per_page/2 values
+        let array = array_reader.next_batch(values_per_page).unwrap();
+        assert_eq!(array.len(), values_per_page / 2);
+        assert_eq!(
+            Some(&def_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_def_levels()
+        );
+        assert_eq!(
+            Some(&rep_levels[accu_len..(accu_len + array.len())]),
+            array_reader.get_rep_levels()
+        );
+    }
+}
diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs
index b1aa39e..afb1fdc 100644
--- a/parquet/src/arrow/mod.rs
+++ b/parquet/src/arrow/mod.rs
@@ -49,10 +49,11 @@
 //!}
 //! ```
 
-pub(in crate::arrow) mod array_reader;
+pub mod array_reader;
+pub mod arrow_array_reader;
 pub mod arrow_reader;
 pub mod arrow_writer;
-pub(in crate::arrow) mod converter;
+pub mod converter;
 pub(in crate::arrow) mod levels;
 pub(in crate::arrow) mod record_reader;
 pub mod schema;
diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs
index 7e3b6a8..4dd7da9 100644
--- a/parquet/src/arrow/record_reader.rs
+++ b/parquet/src/arrow/record_reader.rs
@@ -464,6 +464,14 @@
         }
     }
 
+    impl Iterator for TestPageReader {
+        type Item = Result<Page>;
+
+        fn next(&mut self) -> Option<Self::Item> {
+            self.get_next_page().transpose()
+        }
+    }
+
     #[test]
     fn test_read_required_records() {
         // Construct column schema
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index b351578..b75d3b5 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -28,6 +28,7 @@
 /// List of supported pages.
 /// These are 1-to-1 mapped from the equivalent Thrift definitions, except `buf` which
 /// used to store uncompressed bytes of the page.
+#[derive(Clone)]
 pub enum Page {
     DataPage {
         buf: ByteBufferPtr,
@@ -188,7 +189,7 @@
 
 /// API for reading pages from a column chunk.
 /// This offers a iterator like API to get the next page.
-pub trait PageReader {
+pub trait PageReader: Iterator<Item = Result<Page>> {
     /// Gets the next page in the column chunk associated with this reader.
     /// Returns `None` if there are no pages left.
     fn get_next_page(&mut self) -> Result<Option<Page>>;
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 1181565..63be17b 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -1353,4 +1353,12 @@
             Ok(self.pages.next())
         }
     }
+
+    impl Iterator for TestPageReader {
+        type Item = Result<Page>;
+
+        fn next(&mut self) -> Option<Self::Item> {
+            self.get_next_page().transpose()
+        }
+    }
 }
diff --git a/parquet/src/encodings/mod.rs b/parquet/src/encodings/mod.rs
index 33b1e23..6046dda 100644
--- a/parquet/src/encodings/mod.rs
+++ b/parquet/src/encodings/mod.rs
@@ -18,4 +18,4 @@
 pub mod decoding;
 pub mod encoding;
 pub mod levels;
-mod rle;
+pub(crate) mod rle;
diff --git a/parquet/src/errors.rs b/parquet/src/errors.rs
index 021c1f0..be1a221 100644
--- a/parquet/src/errors.rs
+++ b/parquet/src/errors.rs
@@ -22,7 +22,7 @@
 #[cfg(any(feature = "arrow", test))]
 use arrow::error::ArrowError;
 
-#[derive(Debug, PartialEq)]
+#[derive(Debug, PartialEq, Clone)]
 pub enum ParquetError {
     /// General Parquet error.
     /// Returned when code violates normal workflow of working with Parquet files.
diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs
index 0877e62..a4d79a3 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -271,6 +271,14 @@
     }
 }
 
+impl<T: Read> Iterator for SerializedPageReader<T> {
+    type Item = Result<Page>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.get_next_page().transpose()
+    }
+}
+
 impl<T: Read> PageReader for SerializedPageReader<T> {
     fn get_next_page(&mut self) -> Result<Option<Page>> {
         while self.seen_num_values < self.total_num_values {
diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs
index a931b95..900e2b5 100644
--- a/parquet/src/lib.rs
+++ b/parquet/src/lib.rs
@@ -46,7 +46,7 @@
 pub use self::util::memory;
 
 #[macro_use]
-mod util;
+pub mod util;
 #[cfg(any(feature = "arrow", test))]
 pub mod arrow;
 pub mod column;
diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs
index 03b2500..1aa8c26 100644
--- a/parquet/src/schema/types.rs
+++ b/parquet/src/schema/types.rs
@@ -757,11 +757,13 @@
     }
 
     /// Returns maximum definition level for this column.
+    #[inline]
     pub fn max_def_level(&self) -> i16 {
         self.max_def_level
     }
 
     /// Returns maximum repetition level for this column.
+    #[inline]
     pub fn max_rep_level(&self) -> i16 {
         self.max_rep_level
     }
diff --git a/parquet/src/util/memory.rs b/parquet/src/util/memory.rs
index 57d0c24..1642a4b 100644
--- a/parquet/src/util/memory.rs
+++ b/parquet/src/util/memory.rs
@@ -292,6 +292,7 @@
     }
 
     /// Returns slice of data in this buffer.
+    #[inline]
     pub fn data(&self) -> &[T] {
         &self.data[self.start..self.start + self.len]
     }
@@ -299,12 +300,20 @@
     /// Updates this buffer with new `start` position and length `len`.
     ///
     /// Range should be within current start position and length.
+    #[inline]
     pub fn with_range(mut self, start: usize, len: usize) -> Self {
-        assert!(start <= self.len);
-        assert!(start + len <= self.len);
+        self.set_range(start, len);
+        self
+    }
+
+    /// Updates this buffer with new `start` position and length `len`.
+    ///
+    /// Range should be within current start position and length.
+    #[inline]
+    pub fn set_range(&mut self, start: usize, len: usize) {
+        assert!(self.start <= start && start + len <= self.start + self.len);
         self.start = start;
         self.len = len;
-        self
     }
 
     /// Adds memory tracker to this buffer.
@@ -314,16 +323,19 @@
     }
 
     /// Returns start position of this buffer.
+    #[inline]
     pub fn start(&self) -> usize {
         self.start
     }
 
     /// Returns length of this buffer
+    #[inline]
     pub fn len(&self) -> usize {
         self.len
     }
 
     /// Returns whether this buffer is empty
+    #[inline]
     pub fn is_empty(&self) -> bool {
         self.len == 0
     }
@@ -393,6 +405,7 @@
 }
 
 impl AsRef<[u8]> for BufferPtr<u8> {
+    #[inline]
     fn as_ref(&self) -> &[u8] {
         &self.data[self.start..self.start + self.len]
     }
diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs
index af9a1aa..8f6d85d 100644
--- a/parquet/src/util/mod.rs
+++ b/parquet/src/util/mod.rs
@@ -22,6 +22,7 @@
 mod bit_packing;
 pub mod cursor;
 pub mod hash_util;
-
-#[cfg(test)]
-pub mod test_common;
+pub(crate) mod test_common;
+pub use self::test_common::page_util::{
+    DataPageBuilder, DataPageBuilderImpl, InMemoryPageIterator,
+};
diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs
index 2e0e8e9..581845a 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -32,7 +32,6 @@
 use std::collections::VecDeque;
 use std::mem;
 use std::sync::Arc;
-use std::vec::IntoIter;
 
 pub trait DataPageBuilder {
     fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]);
@@ -78,6 +77,9 @@
 
     // Adds levels to the buffer and return number of encoded bytes
     fn add_levels(&mut self, max_level: i16, levels: &[i16]) -> u32 {
+        if max_level <= 0 {
+            return 0;
+        }
         let size = max_buffer_size(Encoding::RLE, max_level, levels.len());
         let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, vec![0; size]);
         level_encoder.put(levels).expect("put() should be OK");
@@ -163,60 +165,65 @@
 }
 
 /// A utility page reader which stores pages in memory.
-pub struct InMemoryPageReader {
-    pages: Box<dyn Iterator<Item = Page>>,
+pub struct InMemoryPageReader<P: Iterator<Item = Page>> {
+    page_iter: P,
 }
 
-impl InMemoryPageReader {
-    pub fn new(pages: Vec<Page>) -> Self {
+impl<P: Iterator<Item = Page>> InMemoryPageReader<P> {
+    pub fn new(pages: impl IntoIterator<Item = Page, IntoIter = P>) -> Self {
         Self {
-            pages: Box::new(pages.into_iter()),
+            page_iter: pages.into_iter(),
         }
     }
 }
 
-impl PageReader for InMemoryPageReader {
+impl<P: Iterator<Item = Page>> PageReader for InMemoryPageReader<P> {
     fn get_next_page(&mut self) -> Result<Option<Page>> {
-        Ok(self.pages.next())
+        Ok(self.page_iter.next())
+    }
+}
+
+impl<P: Iterator<Item = Page>> Iterator for InMemoryPageReader<P> {
+    type Item = Result<Page>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        self.get_next_page().transpose()
     }
 }
 
 /// A utility page iterator which stores page readers in memory, used for tests.
-pub struct InMemoryPageIterator {
+#[derive(Clone)]
+pub struct InMemoryPageIterator<I: Iterator<Item = Vec<Page>>> {
     schema: SchemaDescPtr,
     column_desc: ColumnDescPtr,
-    page_readers: IntoIter<Box<dyn PageReader>>,
+    page_reader_iter: I,
 }
 
-impl InMemoryPageIterator {
+impl<I: Iterator<Item = Vec<Page>>> InMemoryPageIterator<I> {
     pub fn new(
         schema: SchemaDescPtr,
         column_desc: ColumnDescPtr,
-        pages: Vec<Vec<Page>>,
+        pages: impl IntoIterator<Item = Vec<Page>, IntoIter = I>,
     ) -> Self {
-        let page_readers = pages
-            .into_iter()
-            .map(|pages| Box::new(InMemoryPageReader::new(pages)) as Box<dyn PageReader>)
-            .collect::<Vec<Box<dyn PageReader>>>()
-            .into_iter();
-
         Self {
             schema,
             column_desc,
-            page_readers,
+            page_reader_iter: pages.into_iter(),
         }
     }
 }
 
-impl Iterator for InMemoryPageIterator {
+impl<I: Iterator<Item = Vec<Page>>> Iterator for InMemoryPageIterator<I> {
     type Item = Result<Box<dyn PageReader>>;
 
     fn next(&mut self) -> Option<Self::Item> {
-        self.page_readers.next().map(Ok)
+        self.page_reader_iter
+            .next()
+            .map(|x| Ok(Box::new(InMemoryPageReader::new(x)) as Box<dyn PageReader>))
     }
 }
 
-impl PageIterator for InMemoryPageIterator {
+impl<I: Iterator<Item = Vec<Page>>> PageIterator for InMemoryPageIterator<I> {
     fn schema(&mut self) -> Result<SchemaDescPtr> {
         Ok(self.schema.clone())
     }