PARQUET-1100: Introduce RecordReader interface to better support nested data, refactor parquet/arrow/reader

We did not have very consistent logic around reading values from leaf nodes versus reading semantic records where the repetition level is greater than zero. This introduces a reader class that reads from column chunks until it identifies the end of records. It also reads values (with spaces, if required by the schema) into internal buffers. This permitted a substantial refactoring and simplification of the code in parquet::arrow where we were handling the interpretation of batch reads as records manually.

As follow up patch, we should be able to take a collection of record readers from the same "tree" in a nested type and reassemble the intermediate Arrow structure and dealing with any redundant structure information in repetition and definition levels. This should a allow a unification of our nested data read code path so that we can read arbitrary nested structures.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #398 from wesm/PARQUET-1100 and squashes the following commits:

9ea85d9 [Wes McKinney] Revert to const args
f4dc0fe [Wes McKinney] Make parquet::schema::Node non-copyable. Use const-refs instead of const-ptr for non-nullable argument
0d859cc [Wes McKinney] Code review comments, scrubbing some flakes
1368415 [Wes McKinney] Fix more MSVC warnings
eccb84c [Wes McKinney] Give macro more accurate name
0eaada0 [Wes McKinney] Use int64_t instead of int for batch sizes
79c3709 [Wes McKinney] Add documentation. Remove RecordReader from public API
8fa619b [Wes McKinney] Initialize memory in DecodeSpaced to avoid undefined behavior
5a0c860 [Wes McKinney] Remove non-repeated branch from DelimitRecords
c754e6e [Wes McKinney] Refactor to skip record delimiting for non-repeated data
ed2a03f [Wes McKinney] Move more code into TypedRecordReader
2e934e9 [Wes McKinney] Set some integers as const
58d3a0f [Wes McKinney] Do not index into levels arrays
b766371 [Wes McKinney] Add RecordReader::Reserve to preallocate, fixing perf regression. cpplint
1bf3e8f [Wes McKinney] Refactor to create stateful parquet::RecordReader class to better support nested data. Shift value buffering logic from parquet/arrow/reader into RecordReader. Fix bug described in PARQUET-1100
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1deab40..ca37b5f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -628,6 +628,7 @@
   src/parquet/types.cc
 
   src/parquet/arrow/reader.cc
+  src/parquet/arrow/record_reader.cc
   src/parquet/arrow/schema.cc
   src/parquet/arrow/writer.cc
 
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index e899e10..a54fb5d 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -17,6 +17,8 @@
 
 #include "benchmark/benchmark.h"
 
+#include <iostream>
+
 #include "parquet/arrow/reader.h"
 #include "parquet/arrow/writer.h"
 #include "parquet/column_reader.h"
@@ -30,13 +32,14 @@
 using arrow::BooleanBuilder;
 using arrow::NumericBuilder;
 
-#define ABORT_NOT_OK(s)                  \
-  do {                                   \
-    ::arrow::Status _s = (s);            \
-    if (ARROW_PREDICT_FALSE(!_s.ok())) { \
-      exit(-1);                          \
-    }                                    \
-  } while (0);
+#define EXIT_NOT_OK(s)                                        \
+  do {                                                        \
+    ::arrow::Status _s = (s);                                 \
+    if (ARROW_PREDICT_FALSE(!_s.ok())) {                      \
+      std::cout << "Exiting: " << _s.ToString() << std::endl; \
+      exit(EXIT_FAILURE);                                     \
+    }                                                         \
+  } while (0)
 
 namespace parquet {
 
@@ -101,12 +104,12 @@
     std::vector<uint8_t> valid_bytes(BENCHMARK_SIZE, 0);
     int n = {0};
     std::generate(valid_bytes.begin(), valid_bytes.end(), [&n] { return n++ % 2; });
-    ABORT_NOT_OK(builder.Append(vec.data(), vec.size(), valid_bytes.data()));
+    EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), valid_bytes.data()));
   } else {
-    ABORT_NOT_OK(builder.Append(vec.data(), vec.size(), nullptr));
+    EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), nullptr));
   }
   std::shared_ptr<::arrow::Array> array;
-  ABORT_NOT_OK(builder.Finish(&array));
+  EXIT_NOT_OK(builder.Finish(&array));
 
   auto field = ::arrow::field("column", type, nullable);
   auto schema = std::make_shared<::arrow::Schema>(
@@ -125,12 +128,12 @@
     int n = {0};
     std::generate(valid_bytes.begin(), valid_bytes.end(),
                   [&n] { return (n++ % 2) != 0; });
-    ABORT_NOT_OK(builder.Append(vec, valid_bytes));
+    EXIT_NOT_OK(builder.Append(vec, valid_bytes));
   } else {
-    ABORT_NOT_OK(builder.Append(vec));
+    EXIT_NOT_OK(builder.Append(vec));
   }
   std::shared_ptr<::arrow::Array> array;
-  ABORT_NOT_OK(builder.Finish(&array));
+  EXIT_NOT_OK(builder.Finish(&array));
 
   auto field = ::arrow::field("column", ::arrow::boolean(), nullable);
   auto schema = std::make_shared<::arrow::Schema>(
@@ -148,7 +151,7 @@
 
   while (state.KeepRunning()) {
     auto output = std::make_shared<InMemoryOutputStream>();
-    ABORT_NOT_OK(
+    EXIT_NOT_OK(
         WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
   }
   SetBytesProcessed<nullable, ParquetType>(state);
@@ -171,8 +174,7 @@
   std::vector<typename ParquetType::c_type> values(BENCHMARK_SIZE, 128);
   std::shared_ptr<::arrow::Table> table = TableFromVector<ParquetType>(values, nullable);
   auto output = std::make_shared<InMemoryOutputStream>();
-  ABORT_NOT_OK(
-      WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
+  EXIT_NOT_OK(WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
   std::shared_ptr<Buffer> buffer = output->GetBuffer();
 
   while (state.KeepRunning()) {
@@ -180,7 +182,7 @@
         ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
     FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
     std::shared_ptr<::arrow::Table> table;
-    ABORT_NOT_OK(filereader.ReadTable(&table));
+    EXIT_NOT_OK(filereader.ReadTable(&table));
   }
   SetBytesProcessed<nullable, ParquetType>(state);
 }
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index a1e3382..56b4770 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -53,6 +53,9 @@
 using arrow::default_memory_pool;
 using arrow::io::BufferReader;
 
+using arrow::test::randint;
+using arrow::test::random_is_valid;
+
 using ArrowId = ::arrow::Type;
 using ParquetType = parquet::Type;
 using parquet::schema::GroupNode;
@@ -366,6 +369,45 @@
   return std::static_pointer_cast<GroupNode>(node_);
 }
 
+void AssertArraysEqual(const Array& expected, const Array& actual) {
+  if (!actual.Equals(expected)) {
+    std::stringstream pp_result;
+    std::stringstream pp_expected;
+
+    EXPECT_OK(::arrow::PrettyPrint(actual, 0, &pp_result));
+    EXPECT_OK(::arrow::PrettyPrint(expected, 0, &pp_expected));
+    FAIL() << "Got: \n" << pp_result.str() << "\nExpected: \n" << pp_expected.str();
+  }
+}
+
+void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual) {
+  ASSERT_EQ(expected.num_chunks(), actual.num_chunks()) << "# chunks unequal";
+  if (!actual.Equals(expected)) {
+    std::stringstream pp_result;
+    std::stringstream pp_expected;
+
+    for (int i = 0; i < actual.num_chunks(); ++i) {
+      auto c1 = actual.chunk(i);
+      auto c2 = expected.chunk(i);
+      if (!c1->Equals(*c2)) {
+        EXPECT_OK(::arrow::PrettyPrint(*c1, 0, &pp_result));
+        EXPECT_OK(::arrow::PrettyPrint(*c2, 0, &pp_expected));
+        FAIL() << "Chunk " << i << " Got: " << pp_result.str()
+               << "\nExpected: " << pp_expected.str();
+      }
+    }
+  }
+}
+
+void AssertTablesEqual(const Table& expected, const Table& actual) {
+  ASSERT_EQ(expected.num_columns(), actual.num_columns());
+
+  for (int i = 0; i < actual.num_columns(); ++i) {
+    AssertChunkedEqual(*expected.column(i)->data(), *actual.column(i)->data());
+  }
+  ASSERT_TRUE(actual.Equals(expected));
+}
+
 template <typename TestType>
 class TestParquetIO : public ::testing::Test {
  public:
@@ -394,13 +436,14 @@
     ASSERT_NE(nullptr, out->get());
   }
 
-  void ReadAndCheckSingleColumnFile(::arrow::Array* values) {
-    std::shared_ptr<::arrow::Array> out;
+  void ReadAndCheckSingleColumnFile(const Array& values) {
+    std::shared_ptr<Array> out;
 
     std::unique_ptr<FileReader> reader;
     ReaderFromSink(&reader);
     ReadSingleColumnFile(std::move(reader), &out);
-    ASSERT_TRUE(values->Equals(out));
+
+    AssertArraysEqual(values, *out);
   }
 
   void ReadTableFromFile(std::unique_ptr<FileReader> reader,
@@ -440,7 +483,7 @@
     *out = MakeSimpleTable(parent_lists, nullable_parent_lists);
   }
 
-  void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
+  void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) {
     std::shared_ptr<::arrow::Table> out;
     std::unique_ptr<FileReader> reader;
     ReaderFromSink(&reader);
@@ -452,13 +495,14 @@
     ASSERT_EQ(1, chunked_array->num_chunks());
     auto result = chunked_array->chunk(0);
 
-    ASSERT_TRUE(values->Equals(result));
+    AssertArraysEqual(*values, *result);
   }
 
   void CheckRoundTrip(const std::shared_ptr<Table>& table) {
     std::shared_ptr<Table> result;
     DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
-    ASSERT_TRUE(table->Equals(*result));
+
+    AssertTablesEqual(*table, *result);
   }
 
   template <typename ArrayType>
@@ -495,7 +539,7 @@
       MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
   this->WriteColumn(schema, values);
 
-  this->ReadAndCheckSingleColumnFile(values.get());
+  this->ReadAndCheckSingleColumnFile(*values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
@@ -515,7 +559,8 @@
 
   std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
   ASSERT_EQ(1, chunked_array->num_chunks());
-  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+  AssertArraysEqual(*values, *chunked_array->chunk(0));
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
@@ -528,7 +573,7 @@
       MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
   this->WriteColumn(schema, values);
 
-  this->ReadAndCheckSingleColumnFile(values.get());
+  this->ReadAndCheckSingleColumnFile(*values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) {
@@ -547,7 +592,7 @@
       MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL);
   this->WriteColumn(schema, dict_values);
 
-  this->ReadAndCheckSingleColumnFile(values.get());
+  this->ReadAndCheckSingleColumnFile(*values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) {
@@ -558,12 +603,12 @@
 
   std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
   this->WriteColumn(schema, sliced_values);
-  this->ReadAndCheckSingleColumnFile(sliced_values.get());
+  this->ReadAndCheckSingleColumnFile(*sliced_values);
 
   // Slice offset 1 higher
   sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE);
   this->WriteColumn(schema, sliced_values);
-  this->ReadAndCheckSingleColumnFile(sliced_values.get());
+  this->ReadAndCheckSingleColumnFile(*sliced_values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) {
@@ -574,12 +619,12 @@
 
   std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
   this->WriteColumn(schema, sliced_values);
-  this->ReadAndCheckSingleColumnFile(sliced_values.get());
+  this->ReadAndCheckSingleColumnFile(*sliced_values);
 
   // Slice offset 1 higher, thus different null bitmap.
   sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE);
   this->WriteColumn(schema, sliced_values);
-  this->ReadAndCheckSingleColumnFile(sliced_values.get());
+  this->ReadAndCheckSingleColumnFile(*sliced_values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
@@ -636,7 +681,7 @@
   }
   ASSERT_OK_NO_THROW(writer.Close());
 
-  this->ReadAndCheckSingleColumnFile(values.get());
+  this->ReadAndCheckSingleColumnFile(*values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
@@ -679,7 +724,8 @@
 
   std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
   ASSERT_EQ(1, chunked_array->num_chunks());
-  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+  AssertArraysEqual(*values, *chunked_array->chunk(0));
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
@@ -698,7 +744,7 @@
   }
   ASSERT_OK_NO_THROW(writer.Close());
 
-  this->ReadAndCheckSingleColumnFile(values.get());
+  this->ReadAndCheckSingleColumnFile(*values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
@@ -763,7 +809,7 @@
   ASSERT_OK(builder.Append(val));
   std::shared_ptr<Array> values;
   ASSERT_OK(builder.Finish(&values));
-  this->ReadAndCheckSingleColumnFile(values.get());
+  this->ReadAndCheckSingleColumnFile(*values);
 }
 
 using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
@@ -850,7 +896,8 @@
 
   std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
   ASSERT_EQ(1, chunked_array->num_chunks());
-  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+  AssertArraysEqual(*values, *chunked_array->chunk(0));
 }
 
 using TestNullParquetIO = TestParquetIO<::arrow::NullType>;
@@ -871,7 +918,8 @@
 
   std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
   ASSERT_EQ(1, chunked_array->num_chunks());
-  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+  AssertArraysEqual(*values, *chunked_array->chunk(0));
 }
 
 template <typename T>
@@ -1026,14 +1074,14 @@
       table, 1, table->num_rows(), {}, &result,
       ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build());
 
-  ASSERT_TRUE(table->Equals(*result));
+  AssertTablesEqual(*table, *result);
 
   // Cast nanaoseconds to microseconds and use INT64 physical type
   DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
   std::shared_ptr<Table> expected;
   MakeDateTimeTypesTable(&table, true);
 
-  ASSERT_TRUE(table->Equals(*result));
+  AssertTablesEqual(*table, *result);
 }
 
 TEST(TestArrowReadWrite, CoerceTimestamps) {
@@ -1097,13 +1145,14 @@
   DoSimpleRoundtrip(
       input, 1, input->num_rows(), {}, &milli_result,
       ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build());
-  ASSERT_TRUE(milli_result->Equals(*ex_milli_result));
+
+  AssertTablesEqual(*ex_milli_result, *milli_result);
 
   std::shared_ptr<Table> micro_result;
   DoSimpleRoundtrip(
       input, 1, input->num_rows(), {}, &micro_result,
       ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build());
-  ASSERT_TRUE(micro_result->Equals(*ex_micro_result));
+  AssertTablesEqual(*ex_micro_result, *micro_result);
 }
 
 TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
@@ -1213,7 +1262,7 @@
   std::shared_ptr<Table> result;
   DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
 
-  ASSERT_TRUE(result->Equals(*ex_table));
+  AssertTablesEqual(*ex_table, *result);
 }
 
 void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
@@ -1253,7 +1302,7 @@
   std::shared_ptr<Table> result;
   DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result);
 
-  ASSERT_TRUE(table->Equals(*result));
+  AssertTablesEqual(*table, *result);
 }
 
 TEST(TestArrowReadWrite, ReadSingleRowGroup) {
@@ -1328,7 +1377,92 @@
 
   auto ex_schema = std::make_shared<::arrow::Schema>(ex_fields);
   Table expected(ex_schema, ex_columns);
-  ASSERT_TRUE(result->Equals(expected));
+  AssertTablesEqual(expected, *result);
+}
+
+void MakeListTable(int num_rows, std::shared_ptr<Table>* out) {
+  ::arrow::Int32Builder offset_builder;
+
+  std::vector<int32_t> length_draws;
+  randint(num_rows, 0, 100, &length_draws);
+
+  std::vector<int32_t> offset_values;
+
+  // Make sure some of them are length 0
+  int32_t total_elements = 0;
+  for (size_t i = 0; i < length_draws.size(); ++i) {
+    if (length_draws[i] < 10) {
+      length_draws[i] = 0;
+    }
+    offset_values.push_back(total_elements);
+    total_elements += length_draws[i];
+  }
+  offset_values.push_back(total_elements);
+
+  std::vector<int8_t> value_draws;
+  randint<int8_t>(total_elements, 0, 100, &value_draws);
+
+  std::vector<bool> is_valid;
+  random_is_valid(total_elements, 0.1, &is_valid);
+
+  std::shared_ptr<Array> values, offsets;
+  ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(), is_valid,
+                                                      value_draws, &values);
+  ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values, &offsets);
+
+  std::shared_ptr<Array> list_array;
+  ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values, default_memory_pool(),
+                                           &list_array));
+
+  auto f1 = ::arrow::field("a", ::arrow::list(::arrow::int8()));
+  auto schema = ::arrow::schema({f1});
+  std::vector<std::shared_ptr<Array>> arrays = {list_array};
+  *out = std::make_shared<Table>(schema, arrays);
+}
+
+TEST(TestArrowReadWrite, ListLargeRecords) {
+  const int num_rows = 50;
+
+  std::shared_ptr<Table> table;
+  MakeListTable(num_rows, &table);
+
+  std::shared_ptr<Buffer> buffer;
+  WriteTableToBuffer(table, 1, 100, default_arrow_writer_properties(), &buffer);
+
+  std::unique_ptr<FileReader> reader;
+  ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+                              ::arrow::default_memory_pool(),
+                              ::parquet::default_reader_properties(), nullptr, &reader));
+
+  // Read everything
+  std::shared_ptr<Table> result;
+  ASSERT_OK_NO_THROW(reader->ReadTable(&result));
+  AssertTablesEqual(*table, *result);
+
+  ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+                              ::arrow::default_memory_pool(),
+                              ::parquet::default_reader_properties(), nullptr, &reader));
+
+  std::unique_ptr<ColumnReader> col_reader;
+  ASSERT_OK(reader->GetColumn(0, &col_reader));
+
+  auto expected = table->column(0)->data()->chunk(0);
+
+  std::vector<std::shared_ptr<Array>> pieces;
+  for (int i = 0; i < num_rows; ++i) {
+    std::shared_ptr<Array> piece;
+    ASSERT_OK(col_reader->NextBatch(1, &piece));
+    ASSERT_EQ(1, piece->length());
+    pieces.push_back(piece);
+  }
+  auto chunked = std::make_shared<::arrow::ChunkedArray>(pieces);
+
+  auto chunked_col =
+      std::make_shared<::arrow::Column>(table->schema()->field(0), chunked);
+  std::vector<std::shared_ptr<::arrow::Column>> columns = {chunked_col};
+  auto chunked_table = std::make_shared<Table>(table->schema(), columns);
+
+  ASSERT_TRUE(table->Equals(*chunked_table));
 }
 
 TEST(TestArrowWrite, CheckChunkSize) {
@@ -1359,6 +1493,7 @@
 
   void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
     nested_parquet_ = std::make_shared<InMemoryOutputStream>();
+
     writer_ = parquet::ParquetFileWriter::Open(nested_parquet_, schema,
                                                default_writer_properties());
     row_group_writer_ = writer_->AppendRowGroup(num_rows);
@@ -1397,7 +1532,6 @@
 
   void ValidateColumnArray(const ::arrow::Int32Array& array, size_t expected_nulls) {
     ValidateArray(array, expected_nulls);
-
     int j = 0;
     for (int i = 0; i < values_array_->length(); i++) {
       if (array.IsNull(i)) {
@@ -1515,15 +1649,19 @@
 
     int num_columns = num_trees * static_cast<int>((std::pow(num_children, tree_depth)));
 
-    std::vector<int16_t> def_levels(num_rows);
-    std::vector<int16_t> rep_levels(num_rows);
-    for (int i = 0; i < num_rows; i++) {
+    std::vector<int16_t> def_levels;
+    std::vector<int16_t> rep_levels;
+
+    int num_levels = 0;
+    while (num_levels < num_rows) {
       if (node_repetition == Repetition::REQUIRED) {
-        def_levels[i] = 0;  // all is required
+        def_levels.push_back(0);  // all are required
       } else {
-        def_levels[i] = i % tree_depth;  // all is optional
+        int16_t level = static_cast<int16_t>(num_levels % (tree_depth + 2));
+        def_levels.push_back(level);  // all are optional
       }
-      rep_levels[i] = 0;  // none is repeated
+      rep_levels.push_back(0);  // none is repeated
+      ++num_levels;
     }
 
     // Produce values for the columns
@@ -1675,7 +1813,7 @@
   const int num_trees = 10;
   const int depth = 5;
   const int num_children = 3;
-  int num_rows = SMALL_SIZE * depth;
+  int num_rows = SMALL_SIZE * (depth + 2);
   CreateMultiLevelNestedParquet(num_trees, depth, num_children, num_rows, GetParam());
   std::shared_ptr<Table> table;
   ASSERT_OK_NO_THROW(reader_->ReadTable(&table));
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 8d5ea7e..5edc837 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -24,13 +24,18 @@
 #include <queue>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <vector>
 
 #include "arrow/api.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/parallel.h"
 
+#include "parquet/arrow/record_reader.h"
 #include "parquet/arrow/schema.h"
+#include "parquet/column_reader.h"
+#include "parquet/schema.h"
 #include "parquet/util/schema-util.h"
 
 using arrow::Array;
@@ -40,21 +45,28 @@
 using arrow::Int32Array;
 using arrow::ListArray;
 using arrow::StructArray;
+using arrow::TimestampArray;
 using arrow::MemoryPool;
 using arrow::PoolBuffer;
 using arrow::Status;
 using arrow::Table;
 
-using parquet::schema::NodePtr;
+using parquet::schema::Node;
 
 // Help reduce verbosity
 using ParquetReader = parquet::ParquetFileReader;
+using arrow::ParallelFor;
+
+using parquet::internal::RecordReader;
 
 namespace parquet {
 namespace arrow {
 
+using ::arrow::BitUtil::BytesForBits;
+
 constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
-constexpr int64_t kNanosecondsInADay = 86400LL * 1000LL * 1000LL * 1000LL;
+constexpr int64_t kMillisecondsInADay = 86400000LL;
+constexpr int64_t kNanosecondsInADay = kMillisecondsInADay * 1000LL * 1000LL;
 
 static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) {
   int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays;
@@ -66,47 +78,6 @@
 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
 
 // ----------------------------------------------------------------------
-// Helper for parallel for-loop
-
-template <class FUNCTION>
-Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
-  std::vector<std::thread> thread_pool;
-  thread_pool.reserve(nthreads);
-  std::atomic<int> task_counter(0);
-
-  std::mutex error_mtx;
-  bool error_occurred = false;
-  Status error;
-
-  for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
-    thread_pool.emplace_back(
-        [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() {
-          int task_id;
-          while (!error_occurred) {
-            task_id = task_counter.fetch_add(1);
-            if (task_id >= num_tasks) {
-              break;
-            }
-            Status s = func(task_id);
-            if (!s.ok()) {
-              std::lock_guard<std::mutex> lock(error_mtx);
-              error_occurred = true;
-              error = s;
-              break;
-            }
-          }
-        });
-  }
-  for (auto&& thread : thread_pool) {
-    thread.join();
-  }
-  if (error_occurred) {
-    return error;
-  }
-  return Status::OK();
-}
-
-// ----------------------------------------------------------------------
 // Iteration utilities
 
 // Abstraction to decouple row group iteration details from the ColumnReader,
@@ -120,7 +91,7 @@
 
   virtual ~FileColumnIterator() {}
 
-  virtual std::shared_ptr<::parquet::ColumnReader> Next() = 0;
+  virtual std::unique_ptr<::parquet::PageReader> NextChunk() = 0;
 
   const SchemaDescriptor* schema() const { return schema_; }
 
@@ -141,10 +112,10 @@
   explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader)
       : FileColumnIterator(column_index, reader), next_row_group_(0) {}
 
-  std::shared_ptr<::parquet::ColumnReader> Next() override {
-    std::shared_ptr<::parquet::ColumnReader> result;
+  std::unique_ptr<::parquet::PageReader> NextChunk() override {
+    std::unique_ptr<::parquet::PageReader> result;
     if (next_row_group_ < reader_->metadata()->num_row_groups()) {
-      result = reader_->RowGroup(next_row_group_)->Column(column_index_);
+      result = reader_->RowGroup(next_row_group_)->GetColumnPageReader(column_index_);
       next_row_group_++;
     } else {
       result = nullptr;
@@ -164,12 +135,13 @@
         row_group_number_(row_group_number),
         done_(false) {}
 
-  std::shared_ptr<::parquet::ColumnReader> Next() override {
+  std::unique_ptr<::parquet::PageReader> NextChunk() override {
     if (done_) {
       return nullptr;
     }
 
-    auto result = reader_->RowGroup(row_group_number_)->Column(column_index_);
+    auto result =
+        reader_->RowGroup(row_group_number_)->GetColumnPageReader(column_index_);
     done_ = true;
     return result;
   };
@@ -193,8 +165,9 @@
   Status ReadSchemaField(int i, std::shared_ptr<Array>* out);
   Status ReadSchemaField(int i, const std::vector<int>& indices,
                          std::shared_ptr<Array>* out);
-  Status GetReaderForNode(int index, const NodePtr& node, const std::vector<int>& indices,
-                          int16_t def_level, std::unique_ptr<ColumnReader::Impl>* out);
+  Status GetReaderForNode(int index, const Node* node, const std::vector<int>& indices,
+                          int16_t def_level,
+                          std::unique_ptr<ColumnReader::ColumnReaderImpl>* out);
   Status ReadColumn(int i, std::shared_ptr<Array>* out);
   Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
   Status GetSchema(const std::vector<int>& indices,
@@ -223,96 +196,54 @@
   int num_threads_;
 };
 
-typedef const int16_t* ValueLevelsPtr;
-
-class ColumnReader::Impl {
+class ColumnReader::ColumnReaderImpl {
  public:
-  virtual ~Impl() {}
-  virtual Status NextBatch(int batch_size, std::shared_ptr<Array>* out) = 0;
-  virtual Status GetDefLevels(ValueLevelsPtr* data, size_t* length) = 0;
-  virtual Status GetRepLevels(ValueLevelsPtr* data, size_t* length) = 0;
+  virtual ~ColumnReaderImpl() {}
+  virtual Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) = 0;
+  virtual Status GetDefLevels(const int16_t** data, size_t* length) = 0;
+  virtual Status GetRepLevels(const int16_t** data, size_t* length) = 0;
   virtual const std::shared_ptr<Field> field() = 0;
 };
 
 // Reader implementation for primitive arrays
-class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::Impl {
+class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl {
  public:
   PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
-      : pool_(pool),
-        input_(std::move(input)),
-        descr_(input_->descr()),
-        values_buffer_(pool),
-        def_levels_buffer_(pool),
-        rep_levels_buffer_(pool) {
-    DCHECK(NodeToField(input_->descr()->schema_node(), &field_).ok());
+      : pool_(pool), input_(std::move(input)), descr_(input_->descr()) {
+    record_reader_ = RecordReader::Make(descr_, pool_);
+    DCHECK(NodeToField(*input_->descr()->schema_node(), &field_).ok());
     NextRowGroup();
   }
 
   virtual ~PrimitiveImpl() {}
 
-  Status NextBatch(int batch_size, std::shared_ptr<Array>* out) override;
+  Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) override;
 
-  template <typename ArrowType, typename ParquetType>
-  Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
+  template <typename ParquetType>
+  Status WrapIntoListArray(std::shared_ptr<Array>* array);
 
-  template <typename ArrowType>
-  Status ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out);
-
-  template <typename ArrowType>
-  Status ReadFLBABatch(int batch_size, int byte_width, std::shared_ptr<Array>* out);
-
-  template <typename ArrowType>
-  Status InitDataBuffer(int batch_size);
-  Status InitValidBits(int batch_size);
-  template <typename ArrowType, typename ParquetType>
-  Status ReadNullableBatch(TypedColumnReader<ParquetType>* reader, int16_t* def_levels,
-                           int16_t* rep_levels, int64_t values_to_read,
-                           int64_t* levels_read, int64_t* values_read);
-  template <typename ArrowType, typename ParquetType>
-  Status ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
-                              int64_t values_to_read, int64_t* levels_read);
-  Status WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels,
-                           int64_t total_values_read, std::shared_ptr<Array>* array);
-
-  Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
-  Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
+  Status GetDefLevels(const int16_t** data, size_t* length) override;
+  Status GetRepLevels(const int16_t** data, size_t* length) override;
 
   const std::shared_ptr<Field> field() override { return field_; }
 
  private:
   void NextRowGroup();
 
-  template <typename InType, typename OutType>
-  struct can_copy_ptr {
-    static constexpr bool value =
-        std::is_same<InType, OutType>::value ||
-        (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
-         (sizeof(InType) == sizeof(OutType)));
-  };
-
   MemoryPool* pool_;
   std::unique_ptr<FileColumnIterator> input_;
   const ColumnDescriptor* descr_;
 
-  std::shared_ptr<::parquet::ColumnReader> column_reader_;
-  std::shared_ptr<Field> field_;
+  std::shared_ptr<RecordReader> record_reader_;
 
-  PoolBuffer values_buffer_;
-  PoolBuffer def_levels_buffer_;
-  PoolBuffer rep_levels_buffer_;
-  std::shared_ptr<PoolBuffer> data_buffer_;
-  uint8_t* data_buffer_ptr_;
-  std::shared_ptr<PoolBuffer> valid_bits_buffer_;
-  uint8_t* valid_bits_ptr_;
-  int64_t valid_bits_idx_;
-  int64_t null_count_;
+  std::shared_ptr<Field> field_;
 };
 
 // Reader implementation for struct array
-class PARQUET_NO_EXPORT StructImpl : public ColumnReader::Impl {
+class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl {
  public:
-  explicit StructImpl(const std::vector<std::shared_ptr<Impl>>& children,
-                      int16_t struct_def_level, MemoryPool* pool, const NodePtr& node)
+  explicit StructImpl(const std::vector<std::shared_ptr<ColumnReaderImpl>>& children,
+                      int16_t struct_def_level, MemoryPool* pool, const Node* node)
       : children_(children),
         struct_def_level_(struct_def_level),
         pool_(pool),
@@ -322,21 +253,21 @@
 
   virtual ~StructImpl() {}
 
-  Status NextBatch(int batch_size, std::shared_ptr<Array>* out) override;
-  Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
-  Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
+  Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) override;
+  Status GetDefLevels(const int16_t** data, size_t* length) override;
+  Status GetRepLevels(const int16_t** data, size_t* length) override;
   const std::shared_ptr<Field> field() override { return field_; }
 
  private:
-  std::vector<std::shared_ptr<Impl>> children_;
+  std::vector<std::shared_ptr<ColumnReaderImpl>> children_;
   int16_t struct_def_level_;
   MemoryPool* pool_;
   std::shared_ptr<Field> field_;
   PoolBuffer def_levels_buffer_;
 
-  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap,
-                              int64_t* null_count);
-  void InitField(const NodePtr& node, const std::vector<std::shared_ptr<Impl>>& children);
+  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count);
+  void InitField(const Node* node,
+                 const std::vector<std::shared_ptr<ColumnReaderImpl>>& children);
 };
 
 FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
@@ -347,26 +278,26 @@
 Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
   std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get()));
 
-  std::unique_ptr<ColumnReader::Impl> impl(new PrimitiveImpl(pool_, std::move(input)));
+  std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
+      new PrimitiveImpl(pool_, std::move(input)));
   *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
   return Status::OK();
 }
 
-Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
-                                          const std::vector<int>& indices,
-                                          int16_t def_level,
-                                          std::unique_ptr<ColumnReader::Impl>* out) {
+Status FileReader::Impl::GetReaderForNode(
+    int index, const Node* node, const std::vector<int>& indices, int16_t def_level,
+    std::unique_ptr<ColumnReader::ColumnReaderImpl>* out) {
   *out = nullptr;
 
   if (IsSimpleStruct(node)) {
-    const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node.get());
-    std::vector<std::shared_ptr<ColumnReader::Impl>> children;
+    const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node);
+    std::vector<std::shared_ptr<ColumnReader::ColumnReaderImpl>> children;
     for (int i = 0; i < group->field_count(); i++) {
-      std::unique_ptr<ColumnReader::Impl> child_reader;
+      std::unique_ptr<ColumnReader::ColumnReaderImpl> child_reader;
       // TODO(itaiin): Remove the -1 index hack when all types of nested reads
       // are supported. This currently just signals the lower level reader resolution
       // to abort
-      RETURN_NOT_OK(GetReaderForNode(index, group->field(i), indices, def_level + 1,
+      RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices, def_level + 1,
                                      &child_reader));
       if (child_reader != nullptr) {
         children.push_back(std::move(child_reader));
@@ -374,22 +305,22 @@
     }
 
     if (children.size() > 0) {
-      *out = std::unique_ptr<ColumnReader::Impl>(
+      *out = std::unique_ptr<ColumnReader::ColumnReaderImpl>(
           new StructImpl(children, def_level, pool_, node));
     }
   } else {
     // This should be a flat field case - translate the field index to
     // the correct column index by walking down to the leaf node
-    NodePtr walker = node;
+    const Node* walker = node;
     while (!walker->is_primitive()) {
       DCHECK(walker->is_group());
-      auto group = static_cast<GroupNode*>(walker.get());
+      auto group = static_cast<const GroupNode*>(walker);
       if (group->field_count() != 1) {
         return Status::NotImplemented("lists with structs are not supported.");
       }
-      walker = group->field(0);
+      walker = group->field(0).get();
     }
-    auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker.get());
+    auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker);
 
     // If the index of the column is found then a reader for the coliumn is needed.
     // Otherwise *out keeps the nullptr value.
@@ -417,8 +348,8 @@
                                          std::shared_ptr<Array>* out) {
   auto parquet_schema = reader_->metadata()->schema();
 
-  auto node = parquet_schema->group_node()->field(i);
-  std::unique_ptr<ColumnReader::Impl> reader_impl;
+  auto node = parquet_schema->group_node()->field(i).get();
+  std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
 
   RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, &reader_impl));
   if (reader_impl == nullptr) {
@@ -428,24 +359,28 @@
 
   std::unique_ptr<ColumnReader> reader(new ColumnReader(std::move(reader_impl)));
 
-  int64_t batch_size = 0;
-  for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
-    batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
+  // TODO(wesm): This calculation doesn't make much sense when we have repeated
+  // schema nodes
+  int64_t records_to_read = 0;
+
+  const FileMetaData& metadata = *reader_->metadata();
+  for (int j = 0; j < metadata.num_row_groups(); j++) {
+    records_to_read += metadata.RowGroup(j)->ColumnChunk(i)->num_values();
   }
 
-  return reader->NextBatch(static_cast<int>(batch_size), out);
+  return reader->NextBatch(records_to_read, out);
 }
 
 Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
   std::unique_ptr<ColumnReader> flat_column_reader;
   RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
 
-  int64_t batch_size = 0;
+  int64_t records_to_read = 0;
   for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
-    batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
+    records_to_read += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
   }
 
-  return flat_column_reader->NextBatch(static_cast<int>(batch_size), out);
+  return flat_column_reader->NextBatch(records_to_read, out);
 }
 
 Status FileReader::Impl::GetSchema(const std::vector<int>& indices,
@@ -472,16 +407,17 @@
   auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata,
                          this](int i) {
     int column_index = indices[i];
-    int64_t batch_size = rg_metadata->ColumnChunk(column_index)->num_values();
+    int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values();
 
     std::unique_ptr<FileColumnIterator> input(
         new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
 
-    std::unique_ptr<ColumnReader::Impl> impl(new PrimitiveImpl(pool_, std::move(input)));
+    std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
+        new PrimitiveImpl(pool_, std::move(input)));
     ColumnReader flat_column_reader(std::move(impl));
 
     std::shared_ptr<Array> array;
-    RETURN_NOT_OK(flat_column_reader.NextBatch(static_cast<int>(batch_size), &array));
+    RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array));
     columns[i] = std::make_shared<Column>(schema->field(i), array);
     return Status::OK();
   };
@@ -642,282 +578,12 @@
   return impl_->parquet_reader();
 }
 
-template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
-                                           int64_t values_to_read, int64_t* levels_read) {
-  using ArrowCType = typename ArrowType::c_type;
-  using ParquetCType = typename ParquetType::c_type;
+template <typename ParquetType>
+Status PrimitiveImpl::WrapIntoListArray(std::shared_ptr<Array>* array) {
+  const int16_t* def_levels = record_reader_->def_levels();
+  const int16_t* rep_levels = record_reader_->rep_levels();
+  const int64_t total_levels_read = record_reader_->levels_position();
 
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
-  auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
-  int64_t values_read;
-  PARQUET_CATCH_NOT_OK(*levels_read =
-                           reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
-                                             nullptr, values, &values_read));
-
-  ArrowCType* out_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
-  std::copy(values, values + values_read, out_ptr + valid_bits_idx_);
-  valid_bits_idx_ += values_read;
-
-  return Status::OK();
-}
-
-#define NONNULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType)               \
-  template <>                                                                    \
-  Status PrimitiveImpl::ReadNonNullableBatch<ArrowType, ParquetType>(            \
-      TypedColumnReader<ParquetType> * reader, int64_t values_to_read,           \
-      int64_t * levels_read) {                                                   \
-    int64_t values_read;                                                         \
-    CType* out_ptr = reinterpret_cast<CType*>(data_buffer_ptr_);                 \
-    PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(                       \
-                             static_cast<int>(values_to_read), nullptr, nullptr, \
-                             out_ptr + valid_bits_idx_, &values_read));          \
-                                                                                 \
-    valid_bits_idx_ += values_read;                                              \
-                                                                                 \
-    return Status::OK();                                                         \
-  }
-
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
-
-template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
-    TypedColumnReader<Int96Type>* reader, int64_t values_to_read, int64_t* levels_read) {
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
-  auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
-  int64_t values_read;
-  PARQUET_CATCH_NOT_OK(*levels_read =
-                           reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
-                                             nullptr, values, &values_read));
-
-  int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
-  for (int64_t i = 0; i < values_read; i++) {
-    *out_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
-  }
-  valid_bits_idx_ += values_read;
-
-  return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>(
-    TypedColumnReader<Int32Type>* reader, int64_t values_to_read, int64_t* levels_read) {
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
-  auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
-  int64_t values_read;
-  PARQUET_CATCH_NOT_OK(*levels_read =
-                           reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
-                                             nullptr, values, &values_read));
-
-  int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
-  for (int64_t i = 0; i < values_read; i++) {
-    *out_ptr++ = static_cast<int64_t>(values[i]) * 86400000;
-  }
-  valid_bits_idx_ += values_read;
-
-  return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
-    TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
-    int64_t* levels_read) {
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
-  auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
-  int64_t values_read;
-  PARQUET_CATCH_NOT_OK(*levels_read =
-                           reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
-                                             nullptr, values, &values_read));
-
-  for (int64_t i = 0; i < values_read; i++) {
-    if (values[i]) {
-      ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_);
-    }
-    valid_bits_idx_++;
-  }
-
-  return Status::OK();
-}
-
-template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::ReadNullableBatch(TypedColumnReader<ParquetType>* reader,
-                                        int16_t* def_levels, int16_t* rep_levels,
-                                        int64_t values_to_read, int64_t* levels_read,
-                                        int64_t* values_read) {
-  using ArrowCType = typename ArrowType::c_type;
-  using ParquetCType = typename ParquetType::c_type;
-
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
-  auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
-  int64_t null_count;
-  PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
-      static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
-      valid_bits_idx_, levels_read, values_read, &null_count));
-
-  auto data_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
-  INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
-
-  for (int64_t i = 0; i < *values_read; i++) {
-    if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
-      data_ptr[valid_bits_idx_ + i] = values[i];
-    }
-    READ_NEXT_BITSET(valid_bits_ptr_);
-  }
-  null_count_ += null_count;
-  valid_bits_idx_ += *values_read;
-
-  return Status::OK();
-}
-
-#define NULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType)                    \
-  template <>                                                                      \
-  Status PrimitiveImpl::ReadNullableBatch<ArrowType, ParquetType>(                 \
-      TypedColumnReader<ParquetType> * reader, int16_t * def_levels,               \
-      int16_t * rep_levels, int64_t values_to_read, int64_t * levels_read,         \
-      int64_t * values_read) {                                                     \
-    auto data_ptr = reinterpret_cast<CType*>(data_buffer_ptr_);                    \
-    int64_t null_count;                                                            \
-    PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(                                  \
-        static_cast<int>(values_to_read), def_levels, rep_levels,                  \
-        data_ptr + valid_bits_idx_, valid_bits_ptr_, valid_bits_idx_, levels_read, \
-        values_read, &null_count));                                                \
-                                                                                   \
-    valid_bits_idx_ += *values_read;                                               \
-    null_count_ += null_count;                                                     \
-                                                                                   \
-    return Status::OK();                                                           \
-  }
-
-NULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
-NULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
-NULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
-
-template <>
-Status PrimitiveImpl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
-    TypedColumnReader<Int96Type>* reader, int16_t* def_levels, int16_t* rep_levels,
-    int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
-  auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
-  int64_t null_count;
-  PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
-      static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
-      valid_bits_idx_, levels_read, values_read, &null_count));
-
-  auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
-  INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
-  for (int64_t i = 0; i < *values_read; i++) {
-    if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
-      data_ptr[valid_bits_idx_ + i] = impala_timestamp_to_nanoseconds(values[i]);
-    }
-    READ_NEXT_BITSET(valid_bits_ptr_);
-  }
-  null_count_ += null_count;
-  valid_bits_idx_ += *values_read;
-
-  return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNullableBatch<::arrow::Date64Type, Int32Type>(
-    TypedColumnReader<Int32Type>* reader, int16_t* def_levels, int16_t* rep_levels,
-    int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
-  auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
-  int64_t null_count;
-  PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
-      static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
-      valid_bits_idx_, levels_read, values_read, &null_count));
-
-  auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
-  INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
-  for (int64_t i = 0; i < *values_read; i++) {
-    if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
-      data_ptr[valid_bits_idx_ + i] = static_cast<int64_t>(values[i]) * 86400000;
-    }
-    READ_NEXT_BITSET(valid_bits_ptr_);
-  }
-  null_count_ += null_count;
-  valid_bits_idx_ += *values_read;
-
-  return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
-    TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int16_t* rep_levels,
-    int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
-  auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
-  int64_t null_count;
-  PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
-      static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
-      valid_bits_idx_, levels_read, values_read, &null_count));
-
-  INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
-  for (int64_t i = 0; i < *values_read; i++) {
-    if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
-      if (values[i]) {
-        ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_ + i);
-      }
-    }
-    READ_NEXT_BITSET(valid_bits_ptr_);
-  }
-  valid_bits_idx_ += *values_read;
-  null_count_ += null_count;
-
-  return Status::OK();
-}
-
-template <typename ArrowType>
-Status PrimitiveImpl::InitDataBuffer(int batch_size) {
-  using ArrowCType = typename ArrowType::c_type;
-  data_buffer_ = std::make_shared<PoolBuffer>(pool_);
-  RETURN_NOT_OK(data_buffer_->Resize(batch_size * sizeof(ArrowCType), false));
-  data_buffer_ptr_ = data_buffer_->mutable_data();
-
-  return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::InitDataBuffer<::arrow::BooleanType>(int batch_size) {
-  data_buffer_ = std::make_shared<PoolBuffer>(pool_);
-  RETURN_NOT_OK(data_buffer_->Resize(::arrow::BitUtil::CeilByte(batch_size) / 8, false));
-  data_buffer_ptr_ = data_buffer_->mutable_data();
-  memset(data_buffer_ptr_, 0, data_buffer_->size());
-
-  return Status::OK();
-}
-
-Status PrimitiveImpl::InitValidBits(int batch_size) {
-  valid_bits_idx_ = 0;
-  if (descr_->max_definition_level() > 0) {
-    int valid_bits_size =
-        static_cast<int>(::arrow::BitUtil::CeilByte(batch_size + 1)) / 8;
-    valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
-    RETURN_NOT_OK(valid_bits_buffer_->Resize(valid_bits_size, false));
-    valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
-    memset(valid_bits_ptr_, 0, valid_bits_size);
-    null_count_ = 0;
-  }
-  return Status::OK();
-}
-
-Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
-                                        const int16_t* rep_levels,
-                                        int64_t total_levels_read,
-                                        std::shared_ptr<Array>* array) {
   std::shared_ptr<::arrow::Schema> arrow_schema;
   RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()},
                                   input_->metadata()->key_value_metadata(),
@@ -1021,8 +687,8 @@
 
     std::shared_ptr<Array> output(*array);
     for (int64_t j = list_depth - 1; j >= 0; j--) {
-      auto list_type = std::make_shared<::arrow::ListType>(
-          std::make_shared<Field>("item", output->type(), nullable[j + 1]));
+      auto list_type =
+          ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
       output = std::make_shared<::arrow::ListArray>(
           list_type, list_lengths[j], offsets[j], output, valid_bits[j], null_counts[j]);
     }
@@ -1032,346 +698,291 @@
 }
 
 template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::TypedReadBatch(int batch_size, std::shared_ptr<Array>* out) {
+struct supports_fast_path_impl {
   using ArrowCType = typename ArrowType::c_type;
-
-  int values_to_read = batch_size;
-  int total_levels_read = 0;
-  RETURN_NOT_OK(InitDataBuffer<ArrowType>(batch_size));
-  RETURN_NOT_OK(InitValidBits(batch_size));
-  if (descr_->max_definition_level() > 0) {
-    RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  if (descr_->max_repetition_level() > 0) {
-    RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-  int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
-  while ((values_to_read > 0) && column_reader_) {
-    auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
-    int64_t values_read;
-    int64_t levels_read;
-    if (descr_->max_definition_level() == 0) {
-      RETURN_NOT_OK((ReadNonNullableBatch<ArrowType, ParquetType>(reader, values_to_read,
-                                                                  &values_read)));
-    } else {
-      // As per the defintion and checks for flat (list) columns:
-      // descr_->max_definition_level() > 0, <= 3
-      RETURN_NOT_OK((ReadNullableBatch<ArrowType, ParquetType>(
-          reader, def_levels + total_levels_read, rep_levels + total_levels_read,
-          values_to_read, &levels_read, &values_read)));
-      total_levels_read += static_cast<int>(levels_read);
-    }
-    values_to_read -= static_cast<int>(values_read);
-    if (!column_reader_->HasNext()) {
-      NextRowGroup();
-    }
-  }
-
-  // Shrink arrays as they may be larger than the output.
-  RETURN_NOT_OK(data_buffer_->Resize(valid_bits_idx_ * sizeof(ArrowCType)));
-  if (descr_->max_definition_level() > 0) {
-    if (valid_bits_idx_ < batch_size * 0.8) {
-      RETURN_NOT_OK(valid_bits_buffer_->Resize(
-          ::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8, false));
-    }
-    *out = std::make_shared<ArrayType<ArrowType>>(
-        field_->type(), valid_bits_idx_, data_buffer_, valid_bits_buffer_, null_count_);
-    // Relase the ownership as the Buffer is now part of a new Array
-    valid_bits_buffer_.reset();
-  } else {
-    *out = std::make_shared<ArrayType<ArrowType>>(field_->type(), valid_bits_idx_,
-                                                  data_buffer_);
-  }
-  // Relase the ownership as the Buffer is now part of a new Array
-  data_buffer_.reset();
-
-  // Check if we should transform this array into an list array.
-  return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
-
-template <>
-Status PrimitiveImpl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
-    int batch_size, std::shared_ptr<Array>* out) {
-  int values_to_read = batch_size;
-  int total_levels_read = 0;
-  RETURN_NOT_OK(InitDataBuffer<::arrow::BooleanType>(batch_size));
-  RETURN_NOT_OK(InitValidBits(batch_size));
-  if (descr_->max_definition_level() > 0) {
-    RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  if (descr_->max_repetition_level() > 0) {
-    RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-  int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
-  while ((values_to_read > 0) && column_reader_) {
-    auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get());
-    int64_t values_read;
-    int64_t levels_read;
-    int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-    if (descr_->max_definition_level() == 0) {
-      RETURN_NOT_OK((ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
-          reader, values_to_read, &values_read)));
-    } else {
-      // As per the defintion and checks for flat columns:
-      // descr_->max_definition_level() == 1
-      RETURN_NOT_OK((ReadNullableBatch<::arrow::BooleanType, BooleanType>(
-          reader, def_levels + total_levels_read, rep_levels + total_levels_read,
-          values_to_read, &levels_read, &values_read)));
-      total_levels_read += static_cast<int>(levels_read);
-    }
-    values_to_read -= static_cast<int>(values_read);
-    if (!column_reader_->HasNext()) {
-      NextRowGroup();
-    }
-  }
-
-  if (descr_->max_definition_level() > 0) {
-    // TODO: Shrink arrays in the case they are too large
-    if (valid_bits_idx_ < batch_size * 0.8) {
-      // Shrink arrays as they are larger than the output.
-      // TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays
-      //    without the need for a copy. Given a decent underlying allocator this
-      //    should still free some underlying pages to the OS.
-
-      auto data_buffer = std::make_shared<PoolBuffer>(pool_);
-      RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(bool)));
-      memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size());
-      data_buffer_ = data_buffer;
-
-      auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_);
-      RETURN_NOT_OK(
-          valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8));
-      memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(),
-             valid_bits_buffer->size());
-      valid_bits_buffer_ = valid_bits_buffer;
-    }
-    *out = std::make_shared<BooleanArray>(field_->type(), valid_bits_idx_, data_buffer_,
-                                          valid_bits_buffer_, null_count_);
-    // Relase the ownership
-    data_buffer_.reset();
-    valid_bits_buffer_.reset();
-  } else {
-    *out = std::make_shared<BooleanArray>(field_->type(), valid_bits_idx_, data_buffer_);
-    data_buffer_.reset();
-  }
-
-  // Check if we should transform this array into an list array.
-  return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
+  using ParquetCType = typename ParquetType::c_type;
+  static constexpr bool value = std::is_same<ArrowCType, ParquetCType>::value;
+};
 
 template <typename ArrowType>
-Status PrimitiveImpl::ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out) {
-  using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
-
-  int total_levels_read = 0;
-  if (descr_->max_definition_level() > 0) {
-    RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  if (descr_->max_repetition_level() > 0) {
-    RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-  int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
-  int values_to_read = batch_size;
-  BuilderType builder(pool_);
-  while ((values_to_read > 0) && column_reader_) {
-    RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ByteArray), false));
-    auto reader = dynamic_cast<TypedColumnReader<ByteArrayType>*>(column_reader_.get());
-    int64_t values_read;
-    int64_t levels_read;
-    auto values = reinterpret_cast<ByteArray*>(values_buffer_.mutable_data());
-    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
-                             values_to_read, def_levels + total_levels_read,
-                             rep_levels + total_levels_read, values, &values_read));
-    values_to_read -= static_cast<int>(levels_read);
-    if (descr_->max_definition_level() == 0) {
-      for (int64_t i = 0; i < levels_read; i++) {
-        RETURN_NOT_OK(
-            builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len));
-      }
-    } else {
-      // descr_->max_definition_level() > 0
-      int values_idx = 0;
-      int nullable_elements = descr_->schema_node()->is_optional();
-      for (int64_t i = 0; i < levels_read; i++) {
-        if (nullable_elements &&
-            (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) {
-          RETURN_NOT_OK(builder.AppendNull());
-        } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) {
-          RETURN_NOT_OK(
-              builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr),
-                             values[values_idx].len));
-          values_idx++;
-        }
-      }
-      total_levels_read += static_cast<int>(levels_read);
-    }
-    if (!column_reader_->HasNext()) {
-      NextRowGroup();
-    }
-  }
-
-  RETURN_NOT_OK(builder.Finish(out));
-  // Check if we should transform this array into an list array.
-  return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
+struct supports_fast_path_impl<ArrowType, ByteArrayType> {
+  static constexpr bool value = false;
+};
 
 template <typename ArrowType>
-Status PrimitiveImpl::ReadFLBABatch(int batch_size, int byte_width,
-                                    std::shared_ptr<Array>* out) {
-  using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
-  int total_levels_read = 0;
-  if (descr_->max_definition_level() > 0) {
-    RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  if (descr_->max_repetition_level() > 0) {
-    RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-  int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
+struct supports_fast_path_impl<ArrowType, FLBAType> {
+  static constexpr bool value = false;
+};
 
-  int values_to_read = batch_size;
-  BuilderType builder(::arrow::fixed_size_binary(byte_width), pool_);
-  while ((values_to_read > 0) && column_reader_) {
-    RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(FLBA), false));
-    auto reader = dynamic_cast<TypedColumnReader<FLBAType>*>(column_reader_.get());
-    int64_t values_read;
-    int64_t levels_read;
-    auto values = reinterpret_cast<FLBA*>(values_buffer_.mutable_data());
-    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
-                             values_to_read, def_levels + total_levels_read,
-                             rep_levels + total_levels_read, values, &values_read));
-    values_to_read -= static_cast<int>(levels_read);
-    if (descr_->max_definition_level() == 0) {
-      for (int64_t i = 0; i < levels_read; i++) {
-        RETURN_NOT_OK(builder.Append(values[i].ptr));
-      }
+template <typename ArrowType, typename ParquetType>
+using supports_fast_path =
+    typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value>::type;
+
+template <typename ArrowType, typename ParquetType, typename Enable = void>
+struct TransferFunctor {
+  using ArrowCType = typename ArrowType::c_type;
+  using ParquetCType = typename ParquetType::c_type;
+
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    int64_t length = reader->values_written();
+    std::shared_ptr<Buffer> data;
+    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data));
+
+    auto values = reinterpret_cast<const ParquetCType*>(reader->values());
+    auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
+    std::copy(values, values + length, out_ptr);
+
+    if (reader->nullable_values()) {
+      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      *out = std::make_shared<ArrayType<ArrowType>>(type, length, data, is_valid,
+                                                    reader->null_count());
     } else {
-      int values_idx = 0;
-      int nullable_elements = descr_->schema_node()->is_optional();
-      for (int64_t i = 0; i < levels_read; i++) {
-        if (nullable_elements &&
-            (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) {
-          RETURN_NOT_OK(builder.AppendNull());
-        } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) {
-          RETURN_NOT_OK(builder.Append(values[values_idx].ptr));
-          values_idx++;
-        }
-      }
-      total_levels_read += static_cast<int>(levels_read);
+      *out = std::make_shared<ArrayType<ArrowType>>(type, length, data);
     }
-    if (!column_reader_->HasNext()) {
-      NextRowGroup();
-    }
+    return Status::OK();
   }
+};
 
-  RETURN_NOT_OK(builder.Finish(out));
-  // Check if we should transform this array into an list array.
-  return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
+template <typename ArrowType, typename ParquetType>
+struct TransferFunctor<ArrowType, ParquetType,
+                       supports_fast_path<ArrowType, ParquetType>> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    int64_t length = reader->values_written();
+    std::shared_ptr<PoolBuffer> values = reader->ReleaseValues();
+
+    if (reader->nullable_values()) {
+      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      *out = std::make_shared<ArrayType<ArrowType>>(type, length, values, is_valid,
+                                                    reader->null_count());
+    } else {
+      *out = std::make_shared<ArrayType<ArrowType>>(type, length, values);
+    }
+    return Status::OK();
+  }
+};
 
 template <>
-Status PrimitiveImpl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
-    int batch_size, std::shared_ptr<Array>* out) {
-  return ReadByteArrayBatch<::arrow::BinaryType>(batch_size, out);
-}
+struct TransferFunctor<::arrow::BooleanType, BooleanType> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    int64_t length = reader->values_written();
+    std::shared_ptr<Buffer> data;
+
+    const int64_t buffer_size = BytesForBits(length);
+    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, buffer_size, &data));
+
+    // Transfer boolean values to packed bitmap
+    auto values = reinterpret_cast<const bool*>(reader->values());
+    uint8_t* data_ptr = data->mutable_data();
+    memset(data_ptr, 0, buffer_size);
+
+    for (int64_t i = 0; i < length; i++) {
+      if (values[i]) {
+        ::arrow::BitUtil::SetBit(data_ptr, i);
+      }
+    }
+
+    if (reader->nullable_values()) {
+      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      RETURN_NOT_OK(is_valid->Resize(BytesForBits(length), false));
+      *out = std::make_shared<BooleanArray>(type, length, data, is_valid,
+                                            reader->null_count());
+    } else {
+      *out = std::make_shared<BooleanArray>(type, length, data);
+    }
+    return Status::OK();
+  }
+};
 
 template <>
-Status PrimitiveImpl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
-    int batch_size, std::shared_ptr<Array>* out) {
-  return ReadByteArrayBatch<::arrow::StringType>(batch_size, out);
-}
+struct TransferFunctor<::arrow::TimestampType, Int96Type> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    int64_t length = reader->values_written();
+    auto values = reinterpret_cast<const Int96*>(reader->values());
 
-#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType)              \
-  case ::arrow::Type::ENUM:                                         \
-    return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
-    break;
+    std::shared_ptr<Buffer> data;
+    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
 
-Status PrimitiveImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
-  if (!column_reader_) {
+    auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
+    for (int64_t i = 0; i < length; i++) {
+      *data_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
+    }
+
+    if (reader->nullable_values()) {
+      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      *out = std::make_shared<TimestampArray>(type, length, data, is_valid,
+                                              reader->null_count());
+    } else {
+      *out = std::make_shared<TimestampArray>(type, length, data);
+    }
+
+    return Status::OK();
+  }
+};
+
+template <>
+struct TransferFunctor<::arrow::Date64Type, Int32Type> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    int64_t length = reader->values_written();
+    auto values = reinterpret_cast<const int32_t*>(reader->values());
+
+    std::shared_ptr<Buffer> data;
+    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
+    auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
+
+    for (int64_t i = 0; i < length; i++) {
+      *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsInADay;
+    }
+
+    if (reader->nullable_values()) {
+      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      *out = std::make_shared<::arrow::Date64Array>(type, length, data, is_valid,
+                                                    reader->null_count());
+    } else {
+      *out = std::make_shared<::arrow::Date64Array>(type, length, data);
+    }
+    return Status::OK();
+  }
+};
+
+template <typename ArrowType, typename ParquetType>
+struct TransferFunctor<
+    ArrowType, ParquetType,
+    typename std::enable_if<std::is_same<ParquetType, ByteArrayType>::value ||
+                            std::is_same<ParquetType, FLBAType>::value>::type> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    RETURN_NOT_OK(reader->builder()->Finish(out));
+
+    if (type->id() == ::arrow::Type::STRING) {
+      // Convert from BINARY type to STRING
+      auto new_data = (*out)->data()->ShallowCopy();
+      new_data->type = type;
+      RETURN_NOT_OK(::arrow::MakeArray(new_data, out));
+    }
+    return Status::OK();
+  }
+};
+
+#define TRANSFER_DATA(ArrowType, ParquetType)                            \
+  TransferFunctor<ArrowType, ParquetType> func;                          \
+  RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), out)); \
+  RETURN_NOT_OK(WrapIntoListArray<ParquetType>(out))
+
+#define TRANSFER_CASE(ENUM, ArrowType, ParquetType) \
+  case ::arrow::Type::ENUM: {                       \
+    TRANSFER_DATA(ArrowType, ParquetType);          \
+  } break;
+
+Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
+  if (!record_reader_->HasMoreData()) {
     // Exhausted all row groups.
     *out = nullptr;
     return Status::OK();
   }
 
-  switch (field_->type()->id()) {
-    case ::arrow::Type::NA:
-      *out = std::make_shared<::arrow::NullArray>(batch_size);
-      return Status::OK();
-      break;
-      TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
-      TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
-      TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
-      TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
-      TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
-      TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
-      TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
-      TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
-      TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
-      TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
-      TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
-      TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
-      TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
-      TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type)
-      TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type)
-    case ::arrow::Type::FIXED_SIZE_BINARY: {
-      int32_t byte_width =
-          static_cast<::arrow::FixedSizeBinaryType*>(field_->type().get())->byte_width();
-      return ReadFLBABatch<::arrow::FixedSizeBinaryType>(batch_size, byte_width, out);
-      break;
+  if (field_->type()->id() == ::arrow::Type::NA) {
+    *out = std::make_shared<::arrow::NullArray>(records_to_read);
+    return Status::OK();
+  }
+
+  try {
+    // Pre-allocation gives much better performance for flat columns
+    record_reader_->Reserve(records_to_read);
+
+    record_reader_->Reset();
+    while (records_to_read > 0) {
+      if (!record_reader_->HasMoreData()) {
+        break;
+      }
+      int64_t records_read = record_reader_->ReadRecords(records_to_read);
+      records_to_read -= records_read;
+      if (records_read == 0) {
+        NextRowGroup();
+      }
     }
+  } catch (const ::parquet::ParquetException& e) {
+    return ::arrow::Status::IOError(e.what());
+  }
+
+  switch (field_->type()->id()) {
+    TRANSFER_CASE(BOOL, ::arrow::BooleanType, BooleanType)
+    TRANSFER_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
+    TRANSFER_CASE(INT8, ::arrow::Int8Type, Int32Type)
+    TRANSFER_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
+    TRANSFER_CASE(INT16, ::arrow::Int16Type, Int32Type)
+    TRANSFER_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
+    TRANSFER_CASE(INT32, ::arrow::Int32Type, Int32Type)
+    TRANSFER_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
+    TRANSFER_CASE(INT64, ::arrow::Int64Type, Int64Type)
+    TRANSFER_CASE(FLOAT, ::arrow::FloatType, FloatType)
+    TRANSFER_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
+    TRANSFER_CASE(STRING, ::arrow::StringType, ByteArrayType)
+    TRANSFER_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
+    TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
+    TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
+    TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
     case ::arrow::Type::TIMESTAMP: {
       ::arrow::TimestampType* timestamp_type =
           static_cast<::arrow::TimestampType*>(field_->type().get());
       switch (timestamp_type->unit()) {
         case ::arrow::TimeUnit::MILLI:
-          return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
-          break;
-        case ::arrow::TimeUnit::MICRO:
-          return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
-          break;
-        case ::arrow::TimeUnit::NANO:
-          return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out);
-          break;
+        case ::arrow::TimeUnit::MICRO: {
+          TRANSFER_DATA(::arrow::TimestampType, Int64Type);
+        } break;
+        case ::arrow::TimeUnit::NANO: {
+          TRANSFER_DATA(::arrow::TimestampType, Int96Type);
+        } break;
         default:
           return Status::NotImplemented("TimeUnit not supported");
       }
       break;
     }
-      TYPED_BATCH_CASE(TIME32, ::arrow::Time32Type, Int32Type)
-      TYPED_BATCH_CASE(TIME64, ::arrow::Time64Type, Int64Type)
+      TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type)
+      TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type)
     default:
       std::stringstream ss;
       ss << "No support for reading columns of type " << field_->type()->ToString();
       return Status::NotImplemented(ss.str());
   }
-}
 
-void PrimitiveImpl::NextRowGroup() { column_reader_ = input_->Next(); }
-
-Status PrimitiveImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
-  *data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
-  *length = def_levels_buffer_.size() / sizeof(int16_t);
   return Status::OK();
 }
 
-Status PrimitiveImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) {
-  *data = reinterpret_cast<ValueLevelsPtr>(rep_levels_buffer_.data());
-  *length = rep_levels_buffer_.size() / sizeof(int16_t);
+void PrimitiveImpl::NextRowGroup() {
+  std::unique_ptr<PageReader> page_reader = input_->NextChunk();
+  record_reader_->SetPageReader(std::move(page_reader));
+}
+
+Status PrimitiveImpl::GetDefLevels(const int16_t** data, size_t* length) {
+  *data = record_reader_->def_levels();
+  *length = record_reader_->levels_written();
   return Status::OK();
 }
 
-ColumnReader::ColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
+Status PrimitiveImpl::GetRepLevels(const int16_t** data, size_t* length) {
+  *data = record_reader_->rep_levels();
+  *length = record_reader_->levels_written();
+  return Status::OK();
+}
+
+ColumnReader::ColumnReader(std::unique_ptr<ColumnReaderImpl> impl)
+    : impl_(std::move(impl)) {}
 
 ColumnReader::~ColumnReader() {}
 
-Status ColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
-  return impl_->NextBatch(batch_size, out);
+Status ColumnReader::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
+  return impl_->NextBatch(records_to_read, out);
 }
 
 // StructImpl methods
@@ -1380,7 +991,7 @@
                                         int64_t* null_count_out) {
   std::shared_ptr<Buffer> null_bitmap;
   auto null_count = 0;
-  ValueLevelsPtr def_levels_data;
+  const int16_t* def_levels_data;
   size_t def_levels_length;
   RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
   RETURN_NOT_OK(GetEmptyBitmap(pool_, def_levels_length, &null_bitmap));
@@ -1402,7 +1013,7 @@
 
 // TODO(itaiin): Consider caching the results of this calculation -
 //   note that this is only used once for each read for now
-Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
+Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
   *data = nullptr;
   if (children_.size() == 0) {
     // Empty struct
@@ -1411,7 +1022,7 @@
   }
 
   // We have at least one child
-  ValueLevelsPtr child_def_levels;
+  const int16_t* child_def_levels;
   size_t child_length;
   RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length));
   auto size = child_length * sizeof(int16_t);
@@ -1438,27 +1049,27 @@
           std::max(result_levels[i], std::min(child_def_levels[i], struct_def_level_));
     }
   }
-  *data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
+  *data = reinterpret_cast<const int16_t*>(def_levels_buffer_.data());
   *length = child_length;
   return Status::OK();
 }
 
-void StructImpl::InitField(const NodePtr& node,
-                           const std::vector<std::shared_ptr<Impl>>& children) {
+void StructImpl::InitField(
+    const Node* node, const std::vector<std::shared_ptr<ColumnReaderImpl>>& children) {
   // Make a shallow node to field conversion from the children fields
   std::vector<std::shared_ptr<::arrow::Field>> fields(children.size());
   for (size_t i = 0; i < children.size(); i++) {
     fields[i] = children[i]->field();
   }
-  auto type = std::make_shared<::arrow::StructType>(fields);
-  field_ = std::make_shared<Field>(node->name(), type);
+  auto type = ::arrow::struct_(fields);
+  field_ = ::arrow::field(node->name(), type);
 }
 
-Status StructImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) {
+Status StructImpl::GetRepLevels(const int16_t** data, size_t* length) {
   return Status::NotImplemented("GetRepLevels is not implemented for struct");
 }
 
-Status StructImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+Status StructImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
   std::vector<std::shared_ptr<Array>> children_arrays;
   std::shared_ptr<Buffer> null_bitmap;
   int64_t null_count;
@@ -1467,16 +1078,23 @@
   for (auto& child : children_) {
     std::shared_ptr<Array> child_array;
 
-    RETURN_NOT_OK(child->NextBatch(batch_size, &child_array));
-
+    RETURN_NOT_OK(child->NextBatch(records_to_read, &child_array));
     children_arrays.push_back(child_array);
   }
 
   RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count));
 
-  *out = std::make_shared<StructArray>(field()->type(), batch_size, children_arrays,
-                                       null_bitmap, null_count);
+  int64_t struct_length = children_arrays[0]->length();
+  for (size_t i = 1; i < children_arrays.size(); ++i) {
+    if (children_arrays[i]->length() != struct_length) {
+      // TODO(wesm): This should really only occur if the Parquet file is
+      // malformed. Should this be a DCHECK?
+      return Status::Invalid("Struct children had different lengths");
+    }
+  }
 
+  *out = std::make_shared<StructArray>(field()->type(), struct_length, children_arrays,
+                                       null_bitmap, null_count);
   return Status::OK();
 }
 
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index ce82375..faaef9a 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -173,7 +173,7 @@
 // might change in the future.
 class PARQUET_EXPORT ColumnReader {
  public:
-  class PARQUET_NO_EXPORT Impl;
+  class PARQUET_NO_EXPORT ColumnReaderImpl;
   virtual ~ColumnReader();
 
   // Scan the next array of the indicated size. The actual size of the
@@ -185,11 +185,11 @@
   //
   // Returns Status::OK on a successful read, including if you have exhausted
   // the data available in the file.
-  ::arrow::Status NextBatch(int batch_size, std::shared_ptr<::arrow::Array>* out);
+  ::arrow::Status NextBatch(int64_t batch_size, std::shared_ptr<::arrow::Array>* out);
 
  private:
-  std::unique_ptr<Impl> impl_;
-  explicit ColumnReader(std::unique_ptr<Impl> impl);
+  std::unique_ptr<ColumnReaderImpl> impl_;
+  explicit ColumnReader(std::unique_ptr<ColumnReaderImpl> impl);
 
   friend class FileReader;
   friend class PrimitiveImpl;
diff --git a/src/parquet/arrow/record_reader.cc b/src/parquet/arrow/record_reader.cc
new file mode 100644
index 0000000..7275d2f
--- /dev/null
+++ b/src/parquet/arrow/record_reader.cc
@@ -0,0 +1,807 @@
+// licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/record_reader.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <sstream>
+
+#include <arrow/buffer.h>
+#include <arrow/memory_pool.h>
+#include <arrow/status.h>
+#include <arrow/util/bit-util.h>
+#include <arrow/util/rle-encoding.h>
+
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/encoding-internal.h"
+#include "parquet/exception.h"
+#include "parquet/properties.h"
+
+using arrow::MemoryPool;
+
+namespace parquet {
+namespace internal {
+
+namespace BitUtil = ::arrow::BitUtil;
+
+template <typename DType>
+class TypedRecordReader;
+
+// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
+// encoding.
+static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
+  return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
+}
+
+class RecordReader::RecordReaderImpl {
+ public:
+  RecordReaderImpl(const ColumnDescriptor* descr, MemoryPool* pool)
+      : descr_(descr),
+        pool_(pool),
+        num_buffered_values_(0),
+        num_decoded_values_(0),
+        max_def_level_(descr->max_definition_level()),
+        max_rep_level_(descr->max_repetition_level()),
+        at_record_start_(false),
+        records_read_(0),
+        values_written_(0),
+        values_capacity_(0),
+        null_count_(0),
+        levels_written_(0),
+        levels_position_(0),
+        levels_capacity_(0) {
+    nullable_values_ = internal::HasSpacedValues(descr);
+    values_ = std::make_shared<PoolBuffer>(pool);
+    valid_bits_ = std::make_shared<PoolBuffer>(pool);
+    def_levels_ = std::make_shared<PoolBuffer>(pool);
+    rep_levels_ = std::make_shared<PoolBuffer>(pool);
+
+    if (descr->physical_type() == Type::BYTE_ARRAY) {
+      builder_.reset(new ::arrow::BinaryBuilder(pool));
+    } else if (descr->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
+      int byte_width = descr->type_length();
+      std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
+      builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool));
+    }
+    Reset();
+  }
+
+  virtual ~RecordReaderImpl() {}
+
+  virtual int64_t ReadRecords(int64_t num_records) = 0;
+
+  // Dictionary decoders must be reset when advancing row groups
+  virtual void ResetDecoders() = 0;
+
+  void SetPageReader(std::unique_ptr<PageReader> reader) {
+    pager_ = std::move(reader);
+    ResetDecoders();
+  }
+
+  bool HasMoreData() const { return pager_ != nullptr; }
+
+  int16_t* def_levels() const {
+    return reinterpret_cast<int16_t*>(def_levels_->mutable_data());
+  }
+
+  int16_t* rep_levels() {
+    return reinterpret_cast<int16_t*>(rep_levels_->mutable_data());
+  }
+
+  uint8_t* values() const { return values_->mutable_data(); }
+
+  /// \brief Number of values written including nulls (if any)
+  int64_t values_written() const { return values_written_; }
+
+  int64_t levels_position() const { return levels_position_; }
+  int64_t levels_written() const { return levels_written_; }
+
+  // We may outwardly have the appearance of having exhausted a column chunk
+  // when in fact we are in the middle of processing the last batch
+  bool has_values_to_process() const { return levels_position_ < levels_written_; }
+
+  int64_t null_count() const { return null_count_; }
+
+  bool nullable_values() const { return nullable_values_; }
+
+  std::shared_ptr<PoolBuffer> ReleaseValues() {
+    auto result = values_;
+    values_ = std::make_shared<PoolBuffer>(pool_);
+    return result;
+  }
+
+  std::shared_ptr<PoolBuffer> ReleaseIsValid() {
+    auto result = valid_bits_;
+    valid_bits_ = std::make_shared<PoolBuffer>(pool_);
+    return result;
+  }
+
+  ::arrow::ArrayBuilder* builder() { return builder_.get(); }
+
+  // Process written repetition/definition levels to reach the end of
+  // records. Process no more levels than necessary to delimit the indicated
+  // number of logical records. Updates internal state of RecordReader
+  //
+  // \return Number of records delimited
+  int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) {
+    int64_t values_to_read = 0;
+    int64_t records_read = 0;
+
+    const int16_t* def_levels = this->def_levels() + levels_position_;
+    const int16_t* rep_levels = this->rep_levels() + levels_position_;
+
+    DCHECK_GT(max_rep_level_, 0);
+
+    // Count logical records and number of values to read
+    while (levels_position_ < levels_written_) {
+      if (*rep_levels++ == 0) {
+        at_record_start_ = true;
+        if (records_read == num_records) {
+          // We've found the number of records we were looking for
+          break;
+        } else {
+          // Continue
+          ++records_read;
+        }
+      } else {
+        at_record_start_ = false;
+      }
+      if (*def_levels++ == max_def_level_) {
+        ++values_to_read;
+      }
+      ++levels_position_;
+    }
+    *values_seen = values_to_read;
+    return records_read;
+  }
+
+  // Read multiple definition levels into preallocated memory
+  //
+  // Returns the number of decoded definition levels
+  int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
+    if (descr_->max_definition_level() == 0) {
+      return 0;
+    }
+    return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
+  }
+
+  int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
+    if (descr_->max_repetition_level() == 0) {
+      return 0;
+    }
+    return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
+  }
+
+  int64_t available_values_current_page() const {
+    return num_buffered_values_ - num_decoded_values_;
+  }
+
+  void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
+
+  Type::type type() const { return descr_->physical_type(); }
+
+  const ColumnDescriptor* descr() const { return descr_; }
+
+  void Reserve(int64_t capacity) {
+    ReserveLevels(capacity);
+    ReserveValues(capacity);
+  }
+
+  void ReserveLevels(int64_t capacity) {
+    if (descr_->max_definition_level() > 0 &&
+        (levels_written_ + capacity > levels_capacity_)) {
+      int64_t new_levels_capacity = BitUtil::NextPower2(levels_capacity_ + 1);
+      while (levels_written_ + capacity > new_levels_capacity) {
+        new_levels_capacity = BitUtil::NextPower2(new_levels_capacity + 1);
+      }
+      PARQUET_THROW_NOT_OK(
+          def_levels_->Resize(new_levels_capacity * sizeof(int16_t), false));
+      if (descr_->max_repetition_level() > 0) {
+        PARQUET_THROW_NOT_OK(
+            rep_levels_->Resize(new_levels_capacity * sizeof(int16_t), false));
+      }
+      levels_capacity_ = new_levels_capacity;
+    }
+  }
+
+  void ReserveValues(int64_t capacity) {
+    if (values_written_ + capacity > values_capacity_) {
+      int64_t new_values_capacity = BitUtil::NextPower2(values_capacity_ + 1);
+      while (values_written_ + capacity > new_values_capacity) {
+        new_values_capacity = BitUtil::NextPower2(new_values_capacity + 1);
+      }
+
+      int type_size = GetTypeByteSize(descr_->physical_type());
+      PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false));
+      values_capacity_ = new_values_capacity;
+    }
+    if (nullable_values_) {
+      int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_);
+      if (valid_bits_->size() < valid_bytes_new) {
+        int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_);
+        PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
+
+        // Avoid valgrind warnings
+        memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
+               valid_bytes_new - valid_bytes_old);
+      }
+    }
+  }
+
+  void Reset() {
+    ResetValues();
+
+    if (levels_written_ > 0) {
+      const int64_t levels_remaining = levels_written_ - levels_position_;
+      // Shift remaining levels to beginning of buffer and trim to only the number
+      // of decoded levels remaining
+      int16_t* def_data = def_levels();
+      int16_t* rep_data = rep_levels();
+
+      std::copy(def_data + levels_position_, def_data + levels_written_, def_data);
+      std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data);
+
+      PARQUET_THROW_NOT_OK(
+          def_levels_->Resize(levels_remaining * sizeof(int16_t), false));
+      PARQUET_THROW_NOT_OK(
+          rep_levels_->Resize(levels_remaining * sizeof(int16_t), false));
+
+      levels_written_ -= levels_position_;
+      levels_position_ = 0;
+      levels_capacity_ = levels_remaining;
+    }
+
+    records_read_ = 0;
+
+    // Calling Finish on the builders also resets them
+  }
+
+  void ResetValues() {
+    if (values_written_ > 0) {
+      // Resize to 0, but do not shrink to fit
+      PARQUET_THROW_NOT_OK(values_->Resize(0, false));
+      PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false));
+      values_written_ = 0;
+      values_capacity_ = 0;
+      null_count_ = 0;
+    }
+  }
+
+ protected:
+  const ColumnDescriptor* descr_;
+  ::arrow::MemoryPool* pool_;
+
+  std::unique_ptr<PageReader> pager_;
+  std::shared_ptr<Page> current_page_;
+
+  // Not set if full schema for this field has no optional or repeated elements
+  LevelDecoder definition_level_decoder_;
+
+  // Not set for flat schemas.
+  LevelDecoder repetition_level_decoder_;
+
+  // The total number of values stored in the data page. This is the maximum of
+  // the number of encoded definition levels or encoded values. For
+  // non-repeated, required columns, this is equal to the number of encoded
+  // values. For repeated or optional values, there may be fewer data values
+  // than levels, and this tells you how many encoded levels there are in that
+  // case.
+  int64_t num_buffered_values_;
+
+  // The number of values from the current data page that have been decoded
+  // into memory
+  int64_t num_decoded_values_;
+
+  const int max_def_level_;
+  const int max_rep_level_;
+
+  bool nullable_values_;
+
+  bool at_record_start_;
+  int64_t records_read_;
+
+  int64_t values_written_;
+  int64_t values_capacity_;
+  int64_t null_count_;
+
+  int64_t levels_written_;
+  int64_t levels_position_;
+  int64_t levels_capacity_;
+
+  // TODO(wesm): ByteArray / FixedLenByteArray types
+  std::unique_ptr<::arrow::ArrayBuilder> builder_;
+
+  std::shared_ptr<::arrow::PoolBuffer> values_;
+
+  template <typename T>
+  T* ValuesHead() {
+    return reinterpret_cast<T*>(values_->mutable_data()) + values_written_;
+  }
+
+  std::shared_ptr<::arrow::PoolBuffer> valid_bits_;
+  std::shared_ptr<::arrow::PoolBuffer> def_levels_;
+  std::shared_ptr<::arrow::PoolBuffer> rep_levels_;
+};
+
+// The minimum number of repetition/definition levels to decode at a time, for
+// better vectorized performance when doing many smaller record reads
+constexpr int64_t kMinLevelBatchSize = 1024;
+
+template <typename DType>
+class TypedRecordReader : public RecordReader::RecordReaderImpl {
+ public:
+  typedef typename DType::c_type T;
+
+  ~TypedRecordReader() {}
+
+  TypedRecordReader(const ColumnDescriptor* schema, ::arrow::MemoryPool* pool)
+      : RecordReader::RecordReaderImpl(schema, pool), current_decoder_(nullptr) {}
+
+  void ResetDecoders() override { decoders_.clear(); }
+
+  inline void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) {
+    uint8_t* valid_bits = valid_bits_->mutable_data();
+    const int64_t valid_bits_offset = values_written_;
+
+    int64_t num_decoded = current_decoder_->DecodeSpaced(
+        ValuesHead<T>(), static_cast<int>(values_with_nulls),
+        static_cast<int>(null_count), valid_bits, valid_bits_offset);
+    DCHECK_EQ(num_decoded, values_with_nulls);
+  }
+
+  inline void ReadValuesDense(int64_t values_to_read) {
+    int64_t num_decoded =
+        current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read));
+    DCHECK_EQ(num_decoded, values_to_read);
+  }
+
+  // Return number of logical records read
+  int64_t ReadRecordData(const int64_t num_records) {
+    // Conservative upper bound
+    const int64_t possible_num_values =
+        std::max(num_records, levels_written_ - levels_position_);
+    ReserveValues(possible_num_values);
+
+    const int64_t start_levels_position = levels_position_;
+
+    int64_t values_to_read = 0;
+    int64_t records_read = 0;
+    if (max_rep_level_ > 0) {
+      records_read = DelimitRecords(num_records, &values_to_read);
+    } else if (max_def_level_ > 0) {
+      // No repetition levels, skip delimiting logic. Each level represents a
+      // null or not null entry
+      records_read = std::min(levels_written_ - levels_position_, num_records);
+
+      // This is advanced by DelimitRecords, which we skipped
+      levels_position_ += records_read;
+    } else {
+      records_read = values_to_read = num_records;
+    }
+
+    int64_t null_count = 0;
+    if (nullable_values_) {
+      int64_t values_with_nulls = 0;
+      internal::DefinitionLevelsToBitmap(
+          def_levels() + start_levels_position, levels_position_ - start_levels_position,
+          max_def_level_, max_rep_level_, &values_with_nulls, &null_count,
+          valid_bits_->mutable_data(), values_written_);
+      values_to_read = values_with_nulls - null_count;
+      ReadValuesSpaced(values_with_nulls, null_count);
+      ConsumeBufferedValues(levels_position_ - start_levels_position);
+    } else {
+      ReadValuesDense(values_to_read);
+      ConsumeBufferedValues(values_to_read);
+    }
+    // Total values, including null spaces, if any
+    values_written_ += values_to_read + null_count;
+    null_count_ += null_count;
+
+    return records_read;
+  }
+
+  // Returns true if there are still values in this column.
+  bool HasNext() {
+    // Either there is no data page available yet, or the data page has been
+    // exhausted
+    if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
+      if (!ReadNewPage() || num_buffered_values_ == 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  int64_t ReadRecords(int64_t num_records) override {
+    // Delimit records, then read values at the end
+    int64_t records_read = 0;
+
+    if (levels_position_ < levels_written_) {
+      records_read += ReadRecordData(num_records);
+    }
+
+    // HasNext invokes ReadNewPage
+    if (records_read == 0 && !HasNext()) {
+      return 0;
+    }
+
+    int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
+
+    // If we are in the middle of a record, we continue until reaching the
+    // desired number of records or the end of the current record if we've found
+    // enough records
+    while (!at_record_start_ || records_read < num_records) {
+      // Is there more data to read in this row group?
+      if (!HasNext()) {
+        break;
+      }
+
+      /// We perform multiple batch reads until we either exhaust the row group
+      /// or observe the desired number of records
+      int64_t batch_size = std::min(level_batch_size, available_values_current_page());
+
+      // No more data in column
+      if (batch_size == 0) {
+        break;
+      }
+
+      if (max_def_level_ > 0) {
+        ReserveLevels(batch_size);
+
+        int16_t* def_levels = this->def_levels() + levels_written_;
+        int16_t* rep_levels = this->rep_levels() + levels_written_;
+
+        // Not present for non-repeated fields
+        int64_t levels_read = 0;
+        if (max_rep_level_ > 0) {
+          levels_read = ReadDefinitionLevels(batch_size, def_levels);
+          if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
+            throw ParquetException("Number of decoded rep / def levels did not match");
+          }
+        } else if (max_def_level_ > 0) {
+          levels_read = ReadDefinitionLevels(batch_size, def_levels);
+        }
+
+        // Exhausted column chunk
+        if (levels_read == 0) {
+          break;
+        }
+
+        levels_written_ += levels_read;
+        records_read += ReadRecordData(num_records - records_read);
+      } else {
+        // No repetition or definition levels
+        batch_size = std::min(num_records - records_read, batch_size);
+        records_read += ReadRecordData(batch_size);
+      }
+    }
+
+    return records_read;
+  }
+
+ private:
+  typedef Decoder<DType> DecoderType;
+
+  // Map of encoding type to the respective decoder object. For example, a
+  // column chunk's data pages may include both dictionary-encoded and
+  // plain-encoded data.
+  std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_;
+
+  DecoderType* current_decoder_;
+
+  // Advance to the next data page
+  bool ReadNewPage();
+
+  void ConfigureDictionary(const DictionaryPage* page);
+};
+
+template <>
+inline void TypedRecordReader<ByteArrayType>::ReadValuesDense(int64_t values_to_read) {
+  auto values = ValuesHead<ByteArray>();
+  int64_t num_decoded =
+      current_decoder_->Decode(values, static_cast<int>(values_to_read));
+  DCHECK_EQ(num_decoded, values_to_read);
+
+  auto builder = static_cast<::arrow::BinaryBuilder*>(builder_.get());
+  for (int64_t i = 0; i < num_decoded; i++) {
+    PARQUET_THROW_NOT_OK(
+        builder->Append(values[i].ptr, static_cast<int64_t>(values[i].len)));
+  }
+  ResetValues();
+}
+
+template <>
+inline void TypedRecordReader<FLBAType>::ReadValuesDense(int64_t values_to_read) {
+  auto values = ValuesHead<FLBA>();
+  int64_t num_decoded =
+      current_decoder_->Decode(values, static_cast<int>(values_to_read));
+  DCHECK_EQ(num_decoded, values_to_read);
+
+  auto builder = static_cast<::arrow::FixedSizeBinaryBuilder*>(builder_.get());
+  for (int64_t i = 0; i < num_decoded; i++) {
+    PARQUET_THROW_NOT_OK(builder->Append(values[i].ptr));
+  }
+  ResetValues();
+}
+
+template <>
+inline void TypedRecordReader<ByteArrayType>::ReadValuesSpaced(int64_t values_to_read,
+                                                               int64_t null_count) {
+  uint8_t* valid_bits = valid_bits_->mutable_data();
+  const int64_t valid_bits_offset = values_written_;
+  auto values = ValuesHead<ByteArray>();
+
+  int64_t num_decoded = current_decoder_->DecodeSpaced(
+      values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits,
+      valid_bits_offset);
+  DCHECK_EQ(num_decoded, values_to_read);
+
+  auto builder = static_cast<::arrow::BinaryBuilder*>(builder_.get());
+
+  for (int64_t i = 0; i < num_decoded; i++) {
+    if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
+      PARQUET_THROW_NOT_OK(
+          builder->Append(values[i].ptr, static_cast<int64_t>(values[i].len)));
+    } else {
+      PARQUET_THROW_NOT_OK(builder->AppendNull());
+    }
+  }
+  ResetValues();
+}
+
+template <>
+inline void TypedRecordReader<FLBAType>::ReadValuesSpaced(int64_t values_to_read,
+                                                          int64_t null_count) {
+  uint8_t* valid_bits = valid_bits_->mutable_data();
+  const int64_t valid_bits_offset = values_written_;
+  auto values = ValuesHead<FLBA>();
+
+  int64_t num_decoded = current_decoder_->DecodeSpaced(
+      values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits,
+      valid_bits_offset);
+  DCHECK_EQ(num_decoded, values_to_read);
+
+  auto builder = static_cast<::arrow::FixedSizeBinaryBuilder*>(builder_.get());
+  for (int64_t i = 0; i < num_decoded; i++) {
+    if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
+      PARQUET_THROW_NOT_OK(builder->Append(values[i].ptr));
+    } else {
+      PARQUET_THROW_NOT_OK(builder->AppendNull());
+    }
+  }
+  ResetValues();
+}
+
+template <typename DType>
+inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage* page) {
+  int encoding = static_cast<int>(page->encoding());
+  if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
+      page->encoding() == Encoding::PLAIN) {
+    encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
+  }
+
+  auto it = decoders_.find(encoding);
+  if (it != decoders_.end()) {
+    throw ParquetException("Column cannot have more than one dictionary.");
+  }
+
+  if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
+      page->encoding() == Encoding::PLAIN) {
+    PlainDecoder<DType> dictionary(descr_);
+    dictionary.SetData(page->num_values(), page->data(), page->size());
+
+    // The dictionary is fully decoded during DictionaryDecoder::Init, so the
+    // DictionaryPage buffer is no longer required after this step
+    //
+    // TODO(wesm): investigate whether this all-or-nothing decoding of the
+    // dictionary makes sense and whether performance can be improved
+
+    auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_);
+    decoder->SetDict(&dictionary);
+    decoders_[encoding] = decoder;
+  } else {
+    ParquetException::NYI("only plain dictionary encoding has been implemented");
+  }
+
+  current_decoder_ = decoders_[encoding].get();
+}
+
+template <typename DType>
+bool TypedRecordReader<DType>::ReadNewPage() {
+  // Loop until we find the next data page.
+  const uint8_t* buffer;
+
+  while (true) {
+    current_page_ = pager_->NextPage();
+    if (!current_page_) {
+      // EOS
+      return false;
+    }
+
+    if (current_page_->type() == PageType::DICTIONARY_PAGE) {
+      ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
+      continue;
+    } else if (current_page_->type() == PageType::DATA_PAGE) {
+      const DataPage* page = static_cast<const DataPage*>(current_page_.get());
+
+      // Read a data page.
+      num_buffered_values_ = page->num_values();
+
+      // Have not decoded any values from the data page yet
+      num_decoded_values_ = 0;
+
+      buffer = page->data();
+
+      // If the data page includes repetition and definition levels, we
+      // initialize the level decoder and subtract the encoded level bytes from
+      // the page size to determine the number of bytes in the encoded data.
+      int64_t data_size = page->size();
+
+      // Data page Layout: Repetition Levels - Definition Levels - encoded values.
+      // Levels are encoded as rle or bit-packed.
+      // Init repetition levels
+      if (descr_->max_repetition_level() > 0) {
+        int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
+            page->repetition_level_encoding(), descr_->max_repetition_level(),
+            static_cast<int>(num_buffered_values_), buffer);
+        buffer += rep_levels_bytes;
+        data_size -= rep_levels_bytes;
+      }
+      // TODO figure a way to set max_definition_level_ to 0
+      // if the initial value is invalid
+
+      // Init definition levels
+      if (descr_->max_definition_level() > 0) {
+        int64_t def_levels_bytes = definition_level_decoder_.SetData(
+            page->definition_level_encoding(), descr_->max_definition_level(),
+            static_cast<int>(num_buffered_values_), buffer);
+        buffer += def_levels_bytes;
+        data_size -= def_levels_bytes;
+      }
+
+      // Get a decoder object for this page or create a new decoder if this is the
+      // first page with this encoding.
+      Encoding::type encoding = page->encoding();
+
+      if (IsDictionaryIndexEncoding(encoding)) {
+        encoding = Encoding::RLE_DICTIONARY;
+      }
+
+      auto it = decoders_.find(static_cast<int>(encoding));
+      if (it != decoders_.end()) {
+        if (encoding == Encoding::RLE_DICTIONARY) {
+          DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
+        }
+        current_decoder_ = it->second.get();
+      } else {
+        switch (encoding) {
+          case Encoding::PLAIN: {
+            std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
+            decoders_[static_cast<int>(encoding)] = decoder;
+            current_decoder_ = decoder.get();
+            break;
+          }
+          case Encoding::RLE_DICTIONARY:
+            throw ParquetException("Dictionary page must be before data page.");
+
+          case Encoding::DELTA_BINARY_PACKED:
+          case Encoding::DELTA_LENGTH_BYTE_ARRAY:
+          case Encoding::DELTA_BYTE_ARRAY:
+            ParquetException::NYI("Unsupported encoding");
+
+          default:
+            throw ParquetException("Unknown encoding type.");
+        }
+      }
+      current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
+                                static_cast<int>(data_size));
+      return true;
+    } else {
+      // We don't know what this page type is. We're allowed to skip non-data
+      // pages.
+      continue;
+    }
+  }
+  return true;
+}
+
+std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr,
+                                                 MemoryPool* pool) {
+  switch (descr->physical_type()) {
+    case Type::BOOLEAN:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<BooleanType>(descr, pool)));
+    case Type::INT32:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<Int32Type>(descr, pool)));
+    case Type::INT64:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<Int64Type>(descr, pool)));
+    case Type::INT96:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<Int96Type>(descr, pool)));
+    case Type::FLOAT:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<FloatType>(descr, pool)));
+    case Type::DOUBLE:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<DoubleType>(descr, pool)));
+    case Type::BYTE_ARRAY:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<ByteArrayType>(descr, pool)));
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<FLBAType>(descr, pool)));
+    default:
+      DCHECK(false);
+  }
+  // Unreachable code, but supress compiler warning
+  return nullptr;
+}
+
+// ----------------------------------------------------------------------
+// Implement public API
+
+RecordReader::RecordReader(RecordReaderImpl* impl) { impl_.reset(impl); }
+
+RecordReader::~RecordReader() {}
+
+int64_t RecordReader::ReadRecords(int64_t num_records) {
+  return impl_->ReadRecords(num_records);
+}
+
+void RecordReader::Reset() { return impl_->Reset(); }
+
+void RecordReader::Reserve(int64_t num_values) { impl_->Reserve(num_values); }
+
+const int16_t* RecordReader::def_levels() const { return impl_->def_levels(); }
+
+const int16_t* RecordReader::rep_levels() const { return impl_->rep_levels(); }
+
+const uint8_t* RecordReader::values() const { return impl_->values(); }
+
+std::shared_ptr<PoolBuffer> RecordReader::ReleaseValues() {
+  return impl_->ReleaseValues();
+}
+
+std::shared_ptr<PoolBuffer> RecordReader::ReleaseIsValid() {
+  return impl_->ReleaseIsValid();
+}
+
+::arrow::ArrayBuilder* RecordReader::builder() { return impl_->builder(); }
+
+int64_t RecordReader::values_written() const { return impl_->values_written(); }
+
+int64_t RecordReader::levels_position() const { return impl_->levels_position(); }
+
+int64_t RecordReader::levels_written() const { return impl_->levels_written(); }
+
+int64_t RecordReader::null_count() const { return impl_->null_count(); }
+
+bool RecordReader::nullable_values() const { return impl_->nullable_values(); }
+
+bool RecordReader::HasMoreData() const { return impl_->HasMoreData(); }
+
+void RecordReader::SetPageReader(std::unique_ptr<PageReader> reader) {
+  impl_->SetPageReader(std::move(reader));
+}
+
+}  // namespace internal
+}  // namespace parquet
diff --git a/src/parquet/arrow/record_reader.h b/src/parquet/arrow/record_reader.h
new file mode 100644
index 0000000..8d55f9d
--- /dev/null
+++ b/src/parquet/arrow/record_reader.h
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_RECORD_READER_H
+#define PARQUET_RECORD_READER_H
+
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include <arrow/buffer.h>
+#include <arrow/builder.h>
+#include <arrow/memory_pool.h>
+#include <arrow/util/bit-util.h>
+
+#include "parquet/column_page.h"
+#include "parquet/schema.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+namespace internal {
+
+/// \brief Stateful column reader that delimits semantic records for both flat
+/// and nested columns
+///
+/// \note API EXPERIMENTAL
+/// \since 1.3.0
+class RecordReader {
+ public:
+  // So that we can create subclasses
+  class RecordReaderImpl;
+
+  static std::shared_ptr<RecordReader> Make(
+      const ColumnDescriptor* descr,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+  virtual ~RecordReader();
+
+  /// \brief Decoded definition levels
+  const int16_t* def_levels() const;
+
+  /// \brief Decoded repetition levels
+  const int16_t* rep_levels() const;
+
+  /// \brief Decoded values, including nulls, if any
+  const uint8_t* values() const;
+
+  /// \brief Attempt to read indicated number of records from column chunk
+  /// \return number of records read
+  int64_t ReadRecords(int64_t num_records);
+
+  /// \brief Pre-allocate space for data. Results in better flat read performance
+  void Reserve(int64_t num_values);
+
+  /// \brief Clear consumed values and repetition/definition levels as the
+  /// result of calling ReadRecords
+  void Reset();
+
+  std::shared_ptr<PoolBuffer> ReleaseValues();
+  std::shared_ptr<PoolBuffer> ReleaseIsValid();
+  ::arrow::ArrayBuilder* builder();
+
+  /// \brief Number of values written including nulls (if any)
+  int64_t values_written() const;
+
+  /// \brief Number of definition / repetition levels (from those that have
+  /// been decoded) that have been consumed inside the reader.
+  int64_t levels_position() const;
+
+  /// \brief Number of definition / repetition levels that have been written
+  /// internally in the reader
+  int64_t levels_written() const;
+
+  /// \brief Number of nulls in the leaf
+  int64_t null_count() const;
+
+  /// \brief True if the leaf values are nullable
+  bool nullable_values() const;
+
+  /// \brief Return true if the record reader has more internal data yet to
+  /// process
+  bool HasMoreData() const;
+
+  /// \brief Advance record reader to the next row group
+  /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader
+  void SetPageReader(std::unique_ptr<PageReader> reader);
+
+ private:
+  std::unique_ptr<RecordReaderImpl> impl_;
+  explicit RecordReader(RecordReaderImpl* impl);
+};
+
+}  // namespace internal
+}  // namespace parquet
+
+#endif  // PARQUET_RECORD_READER_H
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 87e5b38..e16a1af 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -49,14 +49,14 @@
 const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO);
 const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
 
-TypePtr MakeDecimalType(const PrimitiveNode* node) {
-  int precision = node->decimal_metadata().precision;
-  int scale = node->decimal_metadata().scale;
+TypePtr MakeDecimalType(const PrimitiveNode& node) {
+  int precision = node.decimal_metadata().precision;
+  int scale = node.decimal_metadata().scale;
   return std::make_shared<::arrow::DecimalType>(precision, scale);
 }
 
-static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) {
-  switch (node->logical_type()) {
+static Status FromByteArray(const PrimitiveNode& node, TypePtr* out) {
+  switch (node.logical_type()) {
     case LogicalType::UTF8:
       *out = ::arrow::utf8();
       break;
@@ -71,17 +71,17 @@
   return Status::OK();
 }
 
-static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
-  switch (node->logical_type()) {
+static Status FromFLBA(const PrimitiveNode& node, TypePtr* out) {
+  switch (node.logical_type()) {
     case LogicalType::NONE:
-      *out = ::arrow::fixed_size_binary(node->type_length());
+      *out = ::arrow::fixed_size_binary(node.type_length());
       break;
     case LogicalType::DECIMAL:
       *out = MakeDecimalType(node);
       break;
     default:
       std::stringstream ss;
-      ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+      ss << "Unhandled logical type " << LogicalTypeToString(node.logical_type())
          << " for fixed-length binary array";
       return Status::NotImplemented(ss.str());
       break;
@@ -90,8 +90,8 @@
   return Status::OK();
 }
 
-static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
-  switch (node->logical_type()) {
+static Status FromInt32(const PrimitiveNode& node, TypePtr* out) {
+  switch (node.logical_type()) {
     case LogicalType::NONE:
       *out = ::arrow::int32();
       break;
@@ -124,7 +124,7 @@
       break;
     default:
       std::stringstream ss;
-      ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+      ss << "Unhandled logical type " << LogicalTypeToString(node.logical_type())
          << " for INT32";
       return Status::NotImplemented(ss.str());
       break;
@@ -132,8 +132,8 @@
   return Status::OK();
 }
 
-static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
-  switch (node->logical_type()) {
+static Status FromInt64(const PrimitiveNode& node, TypePtr* out) {
+  switch (node.logical_type()) {
     case LogicalType::NONE:
       *out = ::arrow::int64();
       break;
@@ -157,7 +157,7 @@
       break;
     default:
       std::stringstream ss;
-      ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+      ss << "Unhandled logical type " << LogicalTypeToString(node.logical_type())
          << " for INT64";
       return Status::NotImplemented(ss.str());
       break;
@@ -165,13 +165,13 @@
   return Status::OK();
 }
 
-Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
-  if (primitive->logical_type() == LogicalType::NA) {
+Status FromPrimitive(const PrimitiveNode& primitive, TypePtr* out) {
+  if (primitive.logical_type() == LogicalType::NA) {
     *out = ::arrow::null();
     return Status::OK();
   }
 
-  switch (primitive->physical_type()) {
+  switch (primitive.physical_type()) {
     case ParquetType::BOOLEAN:
       *out = ::arrow::boolean();
       break;
@@ -201,33 +201,33 @@
 }
 
 // Forward declaration
-Status NodeToFieldInternal(const NodePtr& node,
-                           const std::unordered_set<NodePtr>* included_leaf_nodes,
+Status NodeToFieldInternal(const Node& node,
+                           const std::unordered_set<const Node*>* included_leaf_nodes,
                            std::shared_ptr<Field>* out);
 
 /*
  * Auxilary function to test if a parquet schema node is a leaf node
  * that should be included in a resulting arrow schema
  */
-inline bool IsIncludedLeaf(const NodePtr& node,
-                           const std::unordered_set<NodePtr>* included_leaf_nodes) {
+inline bool IsIncludedLeaf(const Node& node,
+                           const std::unordered_set<const Node*>* included_leaf_nodes) {
   if (included_leaf_nodes == nullptr) {
     return true;
   }
-  auto search = included_leaf_nodes->find(node);
+  auto search = included_leaf_nodes->find(&node);
   return (search != included_leaf_nodes->end());
 }
 
-Status StructFromGroup(const GroupNode* group,
-                       const std::unordered_set<NodePtr>* included_leaf_nodes,
+Status StructFromGroup(const GroupNode& group,
+                       const std::unordered_set<const Node*>* included_leaf_nodes,
                        TypePtr* out) {
   std::vector<std::shared_ptr<Field>> fields;
   std::shared_ptr<Field> field;
 
   *out = nullptr;
 
-  for (int i = 0; i < group->field_count(); i++) {
-    RETURN_NOT_OK(NodeToFieldInternal(group->field(i), included_leaf_nodes, &field));
+  for (int i = 0; i < group.field_count(); i++) {
+    RETURN_NOT_OK(NodeToFieldInternal(*group.field(i), included_leaf_nodes, &field));
     if (field != nullptr) {
       fields.push_back(field);
     }
@@ -238,22 +238,23 @@
   return Status::OK();
 }
 
-Status NodeToList(const GroupNode* group,
-                  const std::unordered_set<NodePtr>* included_leaf_nodes, TypePtr* out) {
+Status NodeToList(const GroupNode& group,
+                  const std::unordered_set<const Node*>* included_leaf_nodes,
+                  TypePtr* out) {
   *out = nullptr;
-  if (group->field_count() == 1) {
+  if (group.field_count() == 1) {
     // This attempts to resolve the preferred 3-level list encoding.
-    NodePtr list_node = group->field(0);
-    if (list_node->is_group() && list_node->is_repeated()) {
-      const GroupNode* list_group = static_cast<const GroupNode*>(list_node.get());
+    const Node& list_node = *group.field(0);
+    if (list_node.is_group() && list_node.is_repeated()) {
+      const auto& list_group = static_cast<const GroupNode&>(list_node);
       // Special case mentioned in the format spec:
       //   If the name is array or ends in _tuple, this should be a list of struct
       //   even for single child elements.
-      if (list_group->field_count() == 1 && !HasStructListName(*list_group)) {
+      if (list_group.field_count() == 1 && !HasStructListName(list_group)) {
         // List of primitive type
         std::shared_ptr<Field> item_field;
         RETURN_NOT_OK(
-            NodeToFieldInternal(list_group->field(0), included_leaf_nodes, &item_field));
+            NodeToFieldInternal(*list_group.field(0), included_leaf_nodes, &item_field));
 
         if (item_field != nullptr) {
           *out = ::arrow::list(item_field);
@@ -263,18 +264,17 @@
         std::shared_ptr<::arrow::DataType> inner_type;
         RETURN_NOT_OK(StructFromGroup(list_group, included_leaf_nodes, &inner_type));
         if (inner_type != nullptr) {
-          auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
+          auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false);
           *out = ::arrow::list(item_field);
         }
       }
-    } else if (list_node->is_repeated()) {
+    } else if (list_node.is_repeated()) {
       // repeated primitive node
       std::shared_ptr<::arrow::DataType> inner_type;
-      if (IsIncludedLeaf(static_cast<NodePtr>(list_node), included_leaf_nodes)) {
-        const PrimitiveNode* primitive =
-            static_cast<const PrimitiveNode*>(list_node.get());
-        RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
-        auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
+      if (IsIncludedLeaf(static_cast<const Node&>(list_node), included_leaf_nodes)) {
+        RETURN_NOT_OK(
+            FromPrimitive(static_cast<const PrimitiveNode&>(list_node), &inner_type));
+        auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false);
         *out = ::arrow::list(item_field);
       }
     } else {
@@ -288,49 +288,47 @@
   return Status::OK();
 }
 
-Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) {
+Status NodeToField(const Node& node, std::shared_ptr<Field>* out) {
   return NodeToFieldInternal(node, nullptr, out);
 }
 
-Status NodeToFieldInternal(const NodePtr& node,
-                           const std::unordered_set<NodePtr>* included_leaf_nodes,
+Status NodeToFieldInternal(const Node& node,
+                           const std::unordered_set<const Node*>* included_leaf_nodes,
                            std::shared_ptr<Field>* out) {
   std::shared_ptr<::arrow::DataType> type = nullptr;
-  bool nullable = !node->is_required();
+  bool nullable = !node.is_required();
 
   *out = nullptr;
 
-  if (node->is_repeated()) {
+  if (node.is_repeated()) {
     // 1-level LIST encoding fields are required
     std::shared_ptr<::arrow::DataType> inner_type;
-    if (node->is_group()) {
-      const GroupNode* group = static_cast<const GroupNode*>(node.get());
-      RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &inner_type));
-    } else if (IsIncludedLeaf(static_cast<NodePtr>(node), included_leaf_nodes)) {
-      const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
-      RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
+    if (node.is_group()) {
+      RETURN_NOT_OK(StructFromGroup(static_cast<const GroupNode&>(node),
+                                    included_leaf_nodes, &inner_type));
+    } else if (IsIncludedLeaf(node, included_leaf_nodes)) {
+      RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &inner_type));
     }
     if (inner_type != nullptr) {
-      auto item_field = std::make_shared<Field>(node->name(), inner_type, false);
+      auto item_field = std::make_shared<Field>(node.name(), inner_type, false);
       type = ::arrow::list(item_field);
       nullable = false;
     }
-  } else if (node->is_group()) {
-    const GroupNode* group = static_cast<const GroupNode*>(node.get());
-    if (node->logical_type() == LogicalType::LIST) {
+  } else if (node.is_group()) {
+    const auto& group = static_cast<const GroupNode&>(node);
+    if (node.logical_type() == LogicalType::LIST) {
       RETURN_NOT_OK(NodeToList(group, included_leaf_nodes, &type));
     } else {
       RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &type));
     }
   } else {
     // Primitive (leaf) node
-    if (IsIncludedLeaf(static_cast<NodePtr>(node), included_leaf_nodes)) {
-      const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
-      RETURN_NOT_OK(FromPrimitive(primitive, &type));
+    if (IsIncludedLeaf(node, included_leaf_nodes)) {
+      RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &type));
     }
   }
   if (type != nullptr) {
-    *out = std::make_shared<Field>(node->name(), type, nullable);
+    *out = std::make_shared<Field>(node.name(), type, nullable);
   }
   return Status::OK();
 }
@@ -339,12 +337,12 @@
     const SchemaDescriptor* parquet_schema,
     const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
     std::shared_ptr<::arrow::Schema>* out) {
-  const GroupNode* schema_node = parquet_schema->group_node();
+  const GroupNode& schema_node = *parquet_schema->group_node();
 
-  int num_fields = static_cast<int>(schema_node->field_count());
+  int num_fields = static_cast<int>(schema_node.field_count());
   std::vector<std::shared_ptr<Field>> fields(num_fields);
   for (int i = 0; i < num_fields; i++) {
-    RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i]));
+    RETURN_NOT_OK(NodeToField(*schema_node.field(i), &fields[i]));
   }
 
   *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata);
@@ -362,13 +360,13 @@
   // Index in column_indices should be unique, duplicate indices are merged into one and
   // ordering by its first appearing.
   int num_columns = static_cast<int>(column_indices.size());
-  std::unordered_set<NodePtr> top_nodes;  // to deduplicate the top nodes
-  std::vector<NodePtr> base_nodes;        // to keep the ordering
-  std::unordered_set<NodePtr> included_leaf_nodes(num_columns);
+  std::unordered_set<const Node*> top_nodes;  // to deduplicate the top nodes
+  std::vector<const Node*> base_nodes;        // to keep the ordering
+  std::unordered_set<const Node*> included_leaf_nodes(num_columns);
   for (int i = 0; i < num_columns; i++) {
-    auto column_desc = parquet_schema->Column(column_indices[i]);
-    included_leaf_nodes.insert(column_desc->schema_node());
-    auto column_root = parquet_schema->GetColumnRoot(column_indices[i]);
+    const ColumnDescriptor* column_desc = parquet_schema->Column(column_indices[i]);
+    included_leaf_nodes.insert(column_desc->schema_node().get());
+    const Node* column_root = parquet_schema->GetColumnRoot(column_indices[i]);
     auto insertion = top_nodes.insert(column_root);
     if (insertion.second) {
       base_nodes.push_back(column_root);
@@ -378,7 +376,7 @@
   std::vector<std::shared_ptr<Field>> fields;
   std::shared_ptr<Field> field;
   for (auto node : base_nodes) {
-    RETURN_NOT_OK(NodeToFieldInternal(node, &included_leaf_nodes, &field));
+    RETURN_NOT_OK(NodeToFieldInternal(*node, &included_leaf_nodes, &field));
     if (field != nullptr) {
       fields.push_back(field);
     }
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
index 73e48f2..de153eb 100644
--- a/src/parquet/arrow/schema.h
+++ b/src/parquet/arrow/schema.h
@@ -37,8 +37,9 @@
 
 namespace arrow {
 
-::arrow::Status PARQUET_EXPORT NodeToField(const schema::NodePtr& node,
-                                           std::shared_ptr<::arrow::Field>* out);
+PARQUET_EXPORT
+::arrow::Status NodeToField(const schema::Node& node,
+                            std::shared_ptr<::arrow::Field>* out);
 
 /// Convert parquet schema to arrow schema with selected indices
 /// \param parquet_schema to be converted
@@ -48,16 +49,18 @@
 /// \param key_value_metadata optional metadata, can be nullptr
 /// \param out the corresponding arrow schema
 /// \return Status::OK() on a successful conversion.
-::arrow::Status PARQUET_EXPORT FromParquetSchema(
+PARQUET_EXPORT
+::arrow::Status FromParquetSchema(
     const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices,
     const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
     std::shared_ptr<::arrow::Schema>* out);
 
 // Without indices
-::arrow::Status PARQUET_EXPORT
-FromParquetSchema(const SchemaDescriptor* parquet_schema,
-                  const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
-                  std::shared_ptr<::arrow::Schema>* out);
+PARQUET_EXPORT
+::arrow::Status FromParquetSchema(
+    const SchemaDescriptor* parquet_schema,
+    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
+    std::shared_ptr<::arrow::Schema>* out);
 
 // Without metadata
 ::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema,
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
index 5f6259f..d23e738 100644
--- a/src/parquet/column_reader.cc
+++ b/src/parquet/column_reader.cc
@@ -21,7 +21,10 @@
 #include <cstdint>
 #include <memory>
 
-#include "arrow/util/rle-encoding.h"
+#include <arrow/buffer.h>
+#include <arrow/memory_pool.h>
+#include <arrow/util/bit-util.h>
+#include <arrow/util/rle-encoding.h>
 
 #include "parquet/column_page.h"
 #include "parquet/encoding-internal.h"
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
index df7deb8..6172365 100644
--- a/src/parquet/column_reader.h
+++ b/src/parquet/column_reader.h
@@ -19,6 +19,7 @@
 #define PARQUET_COLUMN_READER_H
 
 #include <algorithm>
+#include <climits>
 #include <cstdint>
 #include <cstring>
 #include <iostream>
@@ -26,6 +27,9 @@
 #include <unordered_map>
 #include <vector>
 
+#include <arrow/buffer.h>
+#include <arrow/builder.h>
+#include <arrow/memory_pool.h>
 #include <arrow/util/bit-util.h>
 
 #include "parquet/column_page.h"
@@ -45,6 +49,8 @@
 
 namespace parquet {
 
+namespace BitUtil = ::arrow::BitUtil;
+
 class PARQUET_EXPORT LevelDecoder {
  public:
   LevelDecoder();
@@ -104,6 +110,12 @@
   // Returns the number of decoded repetition levels
   int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels);
 
+  int64_t available_values_current_page() const {
+    return num_buffered_values_ - num_decoded_values_;
+  }
+
+  void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
+
   const ColumnDescriptor* descr_;
 
   std::unique_ptr<PageReader> pager_;
@@ -130,7 +142,58 @@
   ::arrow::MemoryPool* pool_;
 };
 
-// API to read values from a single column. This is the main client facing API.
+namespace internal {
+
+static inline void DefinitionLevelsToBitmap(
+    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, const int64_t valid_bits_offset) {
+  int64_t byte_offset = valid_bits_offset / 8;
+  int64_t bit_offset = valid_bits_offset % 8;
+  uint8_t bitset = valid_bits[byte_offset];
+
+  // TODO(itaiin): As an interim solution we are splitting the code path here
+  // between repeated+flat column reads, and non-repeated+nested reads.
+  // Those paths need to be merged in the future
+  for (int i = 0; i < num_def_levels; ++i) {
+    if (def_levels[i] == max_definition_level) {
+      bitset |= (1 << bit_offset);
+    } else if (max_repetition_level > 0) {
+      // repetition+flat case
+      if (def_levels[i] == (max_definition_level - 1)) {
+        bitset &= ~(1 << bit_offset);
+        *null_count += 1;
+      } else {
+        continue;
+      }
+    } else {
+      // non-repeated+nested case
+      if (def_levels[i] < max_definition_level) {
+        bitset &= ~(1 << bit_offset);
+        *null_count += 1;
+      } else {
+        throw ParquetException("definition level exceeds maximum");
+      }
+    }
+
+    bit_offset++;
+    if (bit_offset == CHAR_BIT) {
+      bit_offset = 0;
+      valid_bits[byte_offset] = bitset;
+      byte_offset++;
+      // TODO: Except for the last byte, this shouldn't be needed
+      bitset = valid_bits[byte_offset];
+    }
+  }
+  if (bit_offset != 0) {
+    valid_bits[byte_offset] = bitset;
+  }
+  *values_read = bit_offset + byte_offset * 8 - valid_bits_offset;
+}
+
+}  // namespace internal
+
+// API to read values from a single column. This is a main client facing API.
 template <typename DType>
 class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
  public:
@@ -138,7 +201,7 @@
 
   TypedColumnReader(const ColumnDescriptor* schema, std::unique_ptr<PageReader> pager,
                     ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
-      : ColumnReader(schema, std::move(pager), pool), current_decoder_(NULL) {}
+      : ColumnReader(schema, std::move(pager), pool), current_decoder_(nullptr) {}
   virtual ~TypedColumnReader() {}
 
   // Read a batch of repetition levels, definition levels, and values from the
@@ -208,7 +271,7 @@
   typedef Decoder<DType> DecoderType;
 
   // Advance to the next data page
-  virtual bool ReadNewPage();
+  bool ReadNewPage() override;
 
   // Read up to batch_size values from the current data page into the
   // pre-allocated memory T*
@@ -221,7 +284,7 @@
   // to the def_levels.
   //
   // @returns: the number of values read into the out buffer
-  int64_t ReadValuesSpaced(int64_t batch_size, T* out, int null_count,
+  int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count,
                            uint8_t* valid_bits, int64_t valid_bits_offset);
 
   // Map of encoding type to the respective decoder object. For example, a
@@ -234,6 +297,9 @@
   DecoderType* current_decoder_;
 };
 
+// ----------------------------------------------------------------------
+// Type column reader implementations
+
 template <typename DType>
 inline int64_t TypedColumnReader<DType>::ReadValues(int64_t batch_size, T* out) {
   int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size));
@@ -242,11 +308,12 @@
 
 template <typename DType>
 inline int64_t TypedColumnReader<DType>::ReadValuesSpaced(int64_t batch_size, T* out,
-                                                          int null_count,
+                                                          int64_t null_count,
                                                           uint8_t* valid_bits,
                                                           int64_t valid_bits_offset) {
-  return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size), null_count,
-                                        valid_bits, valid_bits_offset);
+  return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size),
+                                        static_cast<int>(null_count), valid_bits,
+                                        valid_bits_offset);
 }
 
 template <typename DType>
@@ -294,59 +361,34 @@
 
   *values_read = ReadValues(values_to_read, values);
   int64_t total_values = std::max(num_def_levels, *values_read);
-  num_decoded_values_ += total_values;
+  ConsumeBufferedValues(total_values);
 
   return total_values;
 }
 
-inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
-                                     int16_t max_definition_level,
-                                     int16_t max_repetition_level, int64_t* values_read,
-                                     int64_t* null_count, uint8_t* valid_bits,
-                                     int64_t valid_bits_offset) {
-  int byte_offset = static_cast<int>(valid_bits_offset) / 8;
-  int bit_offset = static_cast<int>(valid_bits_offset) % 8;
-  uint8_t bitset = valid_bits[byte_offset];
+namespace internal {
 
-  // TODO(itaiin): As an interim solution we are splitting the code path here
-  // between repeated+flat column reads, and non-repeated+nested reads.
-  // Those paths need to be merged in the future
-  for (int i = 0; i < num_def_levels; ++i) {
-    if (def_levels[i] == max_definition_level) {
-      bitset |= (1 << bit_offset);
-    } else if (max_repetition_level > 0) {
-      // repetition+flat case
-      if (def_levels[i] == (max_definition_level - 1)) {
-        bitset &= ~(1 << bit_offset);
-        *null_count += 1;
-      } else {
-        continue;
+// TODO(itaiin): another code path split to merge when the general case is done
+static inline bool HasSpacedValues(const ColumnDescriptor* descr) {
+  if (descr->max_repetition_level() > 0) {
+    // repeated+flat case
+    return !descr->schema_node()->is_required();
+  } else {
+    // non-repeated+nested case
+    // Find if a node forces nulls in the lowest level along the hierarchy
+    const schema::Node* node = descr->schema_node().get();
+    while (node) {
+      if (node->is_optional()) {
+        return true;
       }
-    } else {
-      // non-repeated+nested case
-      if (def_levels[i] < max_definition_level) {
-        bitset &= ~(1 << bit_offset);
-        *null_count += 1;
-      } else {
-        throw ParquetException("definition level exceeds maximum");
-      }
+      node = node->parent();
     }
-
-    bit_offset++;
-    if (bit_offset == 8) {
-      bit_offset = 0;
-      valid_bits[byte_offset] = bitset;
-      byte_offset++;
-      // TODO: Except for the last byte, this shouldn't be needed
-      bitset = valid_bits[byte_offset];
-    }
+    return false;
   }
-  if (bit_offset != 0) {
-    valid_bits[byte_offset] = bitset;
-  }
-  *values_read = (bit_offset + byte_offset * 8 - valid_bits_offset);
 }
 
+}  // namespace internal
+
 template <typename DType>
 inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(
     int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
@@ -377,25 +419,7 @@
       }
     }
 
-    // TODO(itaiin): another code path split to merge when the general case is done
-    bool has_spaced_values;
-    if (descr_->max_repetition_level() > 0) {
-      // repeated+flat case
-      has_spaced_values = !descr_->schema_node()->is_required();
-    } else {
-      // non-repeated+nested case
-      // Find if a node forces nulls in the lowest level along the hierarchy
-      const schema::Node* node = descr_->schema_node().get();
-      has_spaced_values = false;
-      while (node) {
-        auto parent = node->parent();
-        if (node->is_optional()) {
-          has_spaced_values = true;
-          break;
-        }
-        node = parent;
-      }
-    }
+    const bool has_spaced_values = internal::HasSpacedValues(descr_);
 
     int64_t null_count = 0;
     if (!has_spaced_values) {
@@ -413,9 +437,9 @@
     } else {
       int16_t max_definition_level = descr_->max_definition_level();
       int16_t max_repetition_level = descr_->max_repetition_level();
-      DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
-                               max_repetition_level, values_read, &null_count, valid_bits,
-                               valid_bits_offset);
+      internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
+                                         max_repetition_level, values_read, &null_count,
+                                         valid_bits, valid_bits_offset);
       total_values = ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
                                       valid_bits, valid_bits_offset);
     }
@@ -432,12 +456,12 @@
     *levels_read = total_values;
   }
 
-  num_decoded_values_ += *levels_read;
+  ConsumeBufferedValues(*levels_read);
   return total_values;
 }
 
 template <typename DType>
-inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
+int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
   int64_t rows_to_skip = num_rows_to_skip;
   while (HasNext() && rows_to_skip > 0) {
     // If the number of rows to skip is more than the number of undecoded values, skip the
@@ -472,6 +496,9 @@
   return num_rows_to_skip - rows_to_skip;
 }
 
+// ----------------------------------------------------------------------
+// Template instantiations
+
 typedef TypedColumnReader<BooleanType> BoolReader;
 typedef TypedColumnReader<Int32Type> Int32Reader;
 typedef TypedColumnReader<Int64Type> Int64Reader;
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 48f243e..47b86e3 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -491,7 +491,7 @@
   auto writer = this->BuildWriter(LARGE_SIZE);
   // All values being written are NULL
   writer->WriteBatch(this->values_.size(), definition_levels.data(),
-                     repetition_levels.data(), NULL);
+                     repetition_levels.data(), nullptr);
   writer->Close();
 
   // Just read the first SMALL_SIZE rows to ensure we could read it back in
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index fab1f63..5818fd3 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -54,7 +54,7 @@
   using Decoder<DType>::num_values_;
 
   explicit PlainDecoder(const ColumnDescriptor* descr)
-      : Decoder<DType>(descr, Encoding::PLAIN), data_(NULL), len_(0) {
+      : Decoder<DType>(descr, Encoding::PLAIN), data_(nullptr), len_(0) {
     if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
       type_length_ = descr_->type_length();
     } else {
@@ -943,7 +943,7 @@
     for (int i = 0; i < max_values; ++i) {
       int prefix_len = 0;
       prefix_len_decoder_.Decode(&prefix_len, 1);
-      ByteArray suffix = {0, NULL};
+      ByteArray suffix = {0, nullptr};
       suffix_decoder_.Decode(&suffix, 1);
       buffer[i].len = prefix_len + suffix.len;
 
diff --git a/src/parquet/encoding.h b/src/parquet/encoding.h
index 339eb35..e7ed415 100644
--- a/src/parquet/encoding.h
+++ b/src/parquet/encoding.h
@@ -113,6 +113,10 @@
       throw ParquetException("Number of values / definition_levels read did not match");
     }
 
+    // Depending on the number of nulls, some of the value slots in buffer may
+    // be uninitialized, and this will cause valgrind warnings / potentially UB
+    memset(buffer + values_read, 0, (num_values - values_read) * sizeof(T));
+
     // Add spacing for null entries. As we have filled the buffer from the front,
     // we need to add the spacing from the back.
     int values_to_move = values_read;
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 0d8e10e..7f990e0 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -91,7 +91,7 @@
   // API convenience to get a MetaData accessor
   static std::unique_ptr<ColumnChunkMetaData> Make(
       const uint8_t* metadata, const ColumnDescriptor* descr,
-      const ApplicationVersion* writer_version = NULL);
+      const ApplicationVersion* writer_version = nullptr);
 
   ~ColumnChunkMetaData();
 
@@ -116,7 +116,7 @@
 
  private:
   explicit ColumnChunkMetaData(const uint8_t* metadata, const ColumnDescriptor* descr,
-                               const ApplicationVersion* writer_version = NULL);
+                               const ApplicationVersion* writer_version = nullptr);
   // PIMPL Idiom
   class ColumnChunkMetaDataImpl;
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
@@ -127,7 +127,7 @@
   // API convenience to get a MetaData accessor
   static std::unique_ptr<RowGroupMetaData> Make(
       const uint8_t* metadata, const SchemaDescriptor* schema,
-      const ApplicationVersion* writer_version = NULL);
+      const ApplicationVersion* writer_version = nullptr);
 
   ~RowGroupMetaData();
 
@@ -141,7 +141,7 @@
 
  private:
   explicit RowGroupMetaData(const uint8_t* metadata, const SchemaDescriptor* schema,
-                            const ApplicationVersion* writer_version = NULL);
+                            const ApplicationVersion* writer_version = nullptr);
   // PIMPL Idiom
   class RowGroupMetaDataImpl;
   std::unique_ptr<RowGroupMetaDataImpl> impl_;
diff --git a/src/parquet/file/printer.cc b/src/parquet/file/printer.cc
index 2ba9474..727552d 100644
--- a/src/parquet/file/printer.cc
+++ b/src/parquet/file/printer.cc
@@ -109,7 +109,7 @@
     char buffer[bufsize];
 
     // Create readers for selected columns and print contents
-    vector<std::shared_ptr<Scanner>> scanners(selected_columns.size(), NULL);
+    vector<std::shared_ptr<Scanner>> scanners(selected_columns.size(), nullptr);
     int j = 0;
     for (auto i : selected_columns) {
       std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 5ff7398..bc14ec9 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -100,7 +100,7 @@
     }
 
     // Uncompress it if we need to
-    if (decompressor_ != NULL) {
+    if (decompressor_ != nullptr) {
       // Grow the uncompressed buffer if we need to.
       if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
         PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 26876fc..9b9bde9 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -56,6 +56,13 @@
       const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
 }
 
+std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
+  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+                                        << metadata()->num_columns()
+                                        << "columns, requested column: " << i;
+  return contents_->GetColumnPageReader(i);
+}
+
 // Returns the rowgroup metadata
 const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); }
 
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index 0467640..7558394 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -58,6 +58,8 @@
   // column. Ownership is shared with the RowGroupReader.
   std::shared_ptr<ColumnReader> Column(int i);
 
+  std::unique_ptr<PageReader> GetColumnPageReader(int i);
+
  private:
   // Holds a pointer to an instance of Contents implementation
   std::unique_ptr<Contents> contents_;
diff --git a/src/parquet/schema-test.cc b/src/parquet/schema-test.cc
index faacb76..f806c12 100644
--- a/src/parquet/schema-test.cc
+++ b/src/parquet/schema-test.cc
@@ -125,17 +125,17 @@
   ASSERT_EQ(LogicalType::UTF8, node2.logical_type());
 
   // repetition
-  node1 = PrimitiveNode("foo", Repetition::REQUIRED, Type::INT32);
-  node2 = PrimitiveNode("foo", Repetition::OPTIONAL, Type::INT32);
   PrimitiveNode node3("foo", Repetition::REPEATED, Type::INT32);
-
-  ASSERT_TRUE(node1.is_required());
-
-  ASSERT_TRUE(node2.is_optional());
-  ASSERT_FALSE(node2.is_required());
+  PrimitiveNode node4("foo", Repetition::REQUIRED, Type::INT32);
+  PrimitiveNode node5("foo", Repetition::OPTIONAL, Type::INT32);
 
   ASSERT_TRUE(node3.is_repeated());
   ASSERT_FALSE(node3.is_optional());
+
+  ASSERT_TRUE(node4.is_required());
+
+  ASSERT_TRUE(node5.is_optional());
+  ASSERT_FALSE(node5.is_required());
 }
 
 TEST_F(TestPrimitiveNode, FromParquet) {
@@ -687,10 +687,10 @@
   ASSERT_TRUE(descr_.ColumnIndex(*non_column_alien.get()) < 0);
   ASSERT_TRUE(descr_.ColumnIndex(*non_column_familiar.get()) < 0);
 
-  ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0).get());
-  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3).get());
-  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4).get());
-  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(5).get());
+  ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0));
+  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3));
+  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4));
+  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(5));
 
   ASSERT_EQ(schema.get(), descr_.group_node());
 
diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc
index ddd8ac1..8168dc4 100644
--- a/src/parquet/schema.cc
+++ b/src/parquet/schema.cc
@@ -20,6 +20,8 @@
 
 #include <algorithm>
 #include <memory>
+#include <sstream>
+#include <string>
 
 #include "parquet/exception.h"
 #include "parquet/parquet_types.h"
@@ -701,9 +703,15 @@
   return result;
 }
 
-const schema::NodePtr& SchemaDescriptor::GetColumnRoot(int i) const {
+const schema::Node* SchemaDescriptor::GetColumnRoot(int i) const {
   DCHECK(i >= 0 && i < static_cast<int>(leaves_.size()));
-  return leaf_to_base_.find(i)->second;
+  return leaf_to_base_.find(i)->second.get();
+}
+
+std::string SchemaDescriptor::ToString() const {
+  std::ostringstream ss;
+  PrintSchema(schema_.get(), ss);
+  return ss.str();
 }
 
 int ColumnDescriptor::type_scale() const {
diff --git a/src/parquet/schema.h b/src/parquet/schema.h
index c6b7fbe..f93f0db 100644
--- a/src/parquet/schema.h
+++ b/src/parquet/schema.h
@@ -178,6 +178,9 @@
 
   bool EqualsInternal(const Node* other) const;
   void SetParent(const Node* p_parent);
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Node);
 };
 
 // Save our breath all over the place with these typedefs
@@ -255,7 +258,7 @@
 
   bool Equals(const Node* other) const override;
 
-  const NodePtr& field(int i) const { return fields_[i]; }
+  NodePtr field(int i) const { return fields_[i]; }
   int FieldIndex(const std::string& name) const;
   int FieldIndex(const Node& node) const;
 
@@ -398,10 +401,12 @@
   const schema::GroupNode* group_node() const { return group_node_; }
 
   // Returns the root (child of the schema root) node of the leaf(column) node
-  const schema::NodePtr& GetColumnRoot(int i) const;
+  const schema::Node* GetColumnRoot(int i) const;
 
   const std::string& name() const { return group_node_->name(); }
 
+  std::string ToString() const;
+
  private:
   friend class ColumnDescriptor;
 
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
index d3ec942..1521cbd 100644
--- a/src/parquet/statistics-test.cc
+++ b/src/parquet/statistics-test.cc
@@ -194,8 +194,9 @@
 }
 
 template <typename TestType>
-typename std::vector<typename TestType::c_type> TestRowGroupStatistics<
-    TestType>::GetDeepCopy(const std::vector<typename TestType::c_type>& values) {
+typename std::vector<typename TestType::c_type>
+TestRowGroupStatistics<TestType>::GetDeepCopy(
+    const std::vector<typename TestType::c_type>& values) {
   return values;
 }
 
diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h
index 3fd72f2..0698652 100644
--- a/src/parquet/test-util.h
+++ b/src/parquet/test-util.h
@@ -377,7 +377,7 @@
     }
     shared_ptr<DataPage> page = MakeDataPage<Type>(
         d, slice(values, value_start, value_start + values_per_page[i]),
-        values_per_page[i], encoding, NULL, 0,
+        values_per_page[i], encoding, nullptr, 0,
         slice(def_levels, def_level_start, def_level_end), max_def_level,
         slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
     pages.push_back(page);
diff --git a/src/parquet/util/memory-test.cc b/src/parquet/util/memory-test.cc
index cdb6d21..16617a7 100644
--- a/src/parquet/util/memory-test.cc
+++ b/src/parquet/util/memory-test.cc
@@ -214,7 +214,7 @@
 
   for (int i = 0; i < num_allocs; ++i) {
     uint8_t* mem = p.Allocate(alloc_size);
-    ASSERT_TRUE(mem != NULL);
+    ASSERT_TRUE(mem != nullptr);
     total_allocated += alloc_size;
 
     int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
@@ -239,7 +239,7 @@
   for (int i = 0; i < num_allocs; ++i) {
     int alloc_size = i % 2 == 0 ? 1 : ChunkedAllocatorTest::MAX_CHUNK_SIZE;
     uint8_t* mem = p.Allocate(alloc_size);
-    ASSERT_TRUE(mem != NULL);
+    ASSERT_TRUE(mem != nullptr);
     total_allocated += alloc_size;
 
     int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc
index d3a5226..3aa2570 100644
--- a/src/parquet/util/memory.cc
+++ b/src/parquet/util/memory.cc
@@ -121,14 +121,18 @@
 
 template <bool CHECK_LIMIT_FIRST>
 uint8_t* ChunkedAllocator::Allocate(int size) {
-  if (size == 0) return NULL;
+  if (size == 0) {
+    return nullptr;
+  }
 
   int64_t num_bytes = ::arrow::BitUtil::RoundUp(size, 8);
   if (current_chunk_idx_ == -1 ||
       num_bytes + chunks_[current_chunk_idx_].allocated_bytes >
           chunks_[current_chunk_idx_].size) {
-    // If we couldn't allocate a new chunk, return NULL.
-    if (ARROW_PREDICT_FALSE(!FindChunk(num_bytes))) return NULL;
+    // If we couldn't allocate a new chunk, return nullptr.
+    if (ARROW_PREDICT_FALSE(!FindChunk(num_bytes))) {
+      return nullptr;
+    }
   }
   ChunkInfo& info = chunks_[current_chunk_idx_];
   uint8_t* result = info.data + info.allocated_bytes;
@@ -195,7 +199,7 @@
     // Allocate a new chunk. Return early if malloc fails.
     uint8_t* buf = nullptr;
     PARQUET_THROW_NOT_OK(pool_->Allocate(chunk_size, &buf));
-    if (ARROW_PREDICT_FALSE(buf == NULL)) {
+    if (ARROW_PREDICT_FALSE(buf == nullptr)) {
       DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
       current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
       return false;
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index 04dcca4..94b86c1 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -188,7 +188,7 @@
 
     explicit ChunkInfo(int64_t size, uint8_t* buf);
 
-    ChunkInfo() : data(NULL), size(0), allocated_bytes(0) {}
+    ChunkInfo() : data(nullptr), size(0), allocated_bytes(0) {}
   };
 
   /// chunk from which we served the last Allocate() call;
diff --git a/src/parquet/util/schema-util.h b/src/parquet/util/schema-util.h
index 9187962..ef9087b 100644
--- a/src/parquet/util/schema-util.h
+++ b/src/parquet/util/schema-util.h
@@ -25,7 +25,6 @@
 #include "parquet/exception.h"
 #include "parquet/schema.h"
 #include "parquet/types.h"
-#include "parquet/util/logging.h"
 
 using parquet::ParquetException;
 using parquet::SchemaDescriptor;
@@ -49,14 +48,14 @@
 }
 
 // TODO(itaiin): This aux. function is to be deleted once repeated structs are supported
-inline bool IsSimpleStruct(const NodePtr& node) {
+inline bool IsSimpleStruct(const Node* node) {
   if (!node->is_group()) return false;
   if (node->is_repeated()) return false;
   if (node->logical_type() == LogicalType::LIST) return false;
   // Special case mentioned in the format spec:
   //   If the name is array or ends in _tuple, this should be a list of struct
   //   even for single child elements.
-  auto group = static_cast<const GroupNode*>(node.get());
+  auto group = static_cast<const GroupNode*>(node);
   if (group->field_count() == 1 && HasStructListName(*group)) return false;
 
   return true;