PARQUET-1092: Support writing chunked arrow::Table columns

I did quite a bit of refactoring to make this easier / simpler. I think there's some additional work we could do to make the write-path code cleaner, but we should probably wait to do some of that until we implement complete nested data write and read.

I will follow up shortly with some functional tests for ARROW-232 to make sure this works end-to-end on the pyarrow side.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #426 from wesm/PARQUET-1092 and squashes the following commits:

7aa3fab [Wes McKinney] Fix compiler warnings
a72ad12 [Wes McKinney] Test and fix bugs with chunked writes. Ensure that primitive types use right transfer functor on read path
c5006cb [Wes McKinney] Fix and expand date64->date32 conversion unit test
61e364c [Wes McKinney] Compiling again with test failure
c7e6696 [Wes McKinney] More refactoring, chunked writes
9f35818 [Wes McKinney] More refactoring
3e297cc [Wes McKinney] Start refactoring, totally broken
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 02f3751..db12fb4 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -24,7 +24,10 @@
 #include "gtest/gtest.h"
 
 #include <arrow/compute/api.h>
+#include <cstdint>
+#include <functional>
 #include <sstream>
+#include <vector>
 
 #include "parquet/api/reader.h"
 #include "parquet/api/writer.h"
@@ -38,6 +41,7 @@
 
 #include "arrow/api.h"
 #include "arrow/test-util.h"
+#include "arrow/type_traits.h"
 #include "arrow/util/decimal.h"
 
 using arrow::Array;
@@ -45,6 +49,7 @@
 using arrow::Buffer;
 using arrow::ChunkedArray;
 using arrow::Column;
+using arrow::DataType;
 using arrow::ListArray;
 using arrow::PoolBuffer;
 using arrow::PrimitiveArray;
@@ -77,7 +82,7 @@
 
 static constexpr uint32_t kDefaultSeed = 0;
 
-LogicalType::type get_logical_type(const ::arrow::DataType& type) {
+LogicalType::type get_logical_type(const ::DataType& type) {
   switch (type.id()) {
     case ArrowId::UINT8:
       return LogicalType::UINT_8;
@@ -130,7 +135,7 @@
   return LogicalType::NONE;
 }
 
-ParquetType::type get_physical_type(const ::arrow::DataType& type) {
+ParquetType::type get_physical_type(const ::DataType& type) {
   switch (type.id()) {
     case ArrowId::BOOL:
       return ParquetType::BOOLEAN;
@@ -325,74 +330,6 @@
   *out = sink->GetBuffer();
 }
 
-void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
-                       int64_t row_group_size, const std::vector<int>& column_subset,
-                       std::shared_ptr<Table>* out,
-                       const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
-                           default_arrow_writer_properties()) {
-  std::shared_ptr<Buffer> buffer;
-  WriteTableToBuffer(table, num_threads, row_group_size, arrow_properties, &buffer);
-
-  std::unique_ptr<FileReader> reader;
-  ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
-                              ::arrow::default_memory_pool(),
-                              ::parquet::default_reader_properties(), nullptr, &reader));
-
-  reader->set_num_threads(num_threads);
-
-  if (column_subset.size() > 0) {
-    ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
-  } else {
-    // Read everything
-    ASSERT_OK_NO_THROW(reader->ReadTable(out));
-  }
-}
-
-static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::arrow::DataType& type,
-                                                   Repetition::type repetition) {
-  int32_t byte_width = -1;
-  int32_t precision = -1;
-  int32_t scale = -1;
-
-  switch (type.id()) {
-    case ::arrow::Type::DICTIONARY: {
-      const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(type);
-      const ::arrow::DataType& values_type = *dict_type.dictionary()->type();
-      switch (values_type.id()) {
-        case ::arrow::Type::FIXED_SIZE_BINARY:
-          byte_width =
-              static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
-          break;
-        case ::arrow::Type::DECIMAL: {
-          const auto& decimal_type =
-              static_cast<const ::arrow::Decimal128Type&>(values_type);
-          precision = decimal_type.precision();
-          scale = decimal_type.scale();
-          byte_width = DecimalSize(precision);
-        } break;
-        default:
-          break;
-      }
-    } break;
-    case ::arrow::Type::FIXED_SIZE_BINARY:
-      byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width();
-      break;
-    case ::arrow::Type::DECIMAL: {
-      const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
-      precision = decimal_type.precision();
-      scale = decimal_type.scale();
-      byte_width = DecimalSize(precision);
-    } break;
-    default:
-      break;
-  }
-  auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type),
-                                   get_logical_type(type), byte_width, precision, scale);
-  NodePtr node_ =
-      GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
-  return std::static_pointer_cast<GroupNode>(node_);
-}
-
 namespace internal {
 
 void AssertArraysEqual(const Array& expected, const Array& actual) {
@@ -427,13 +364,113 @@
   }
 }
 
-void AssertTablesEqual(const Table& expected, const Table& actual) {
+void PrintColumn(const Column& col, std::stringstream* ss) {
+  const ChunkedArray& carr = *col.data();
+  for (int i = 0; i < carr.num_chunks(); ++i) {
+    auto c1 = carr.chunk(i);
+    *ss << "Chunk " << i << std::endl;
+    EXPECT_OK(::arrow::PrettyPrint(*c1, 0, ss));
+    *ss << std::endl;
+  }
+}
+
+void AssertTablesEqual(const Table& expected, const Table& actual,
+                       bool same_chunk_layout = true) {
   ASSERT_EQ(expected.num_columns(), actual.num_columns());
 
-  for (int i = 0; i < actual.num_columns(); ++i) {
-    AssertChunkedEqual(*expected.column(i)->data(), *actual.column(i)->data());
+  if (same_chunk_layout) {
+    for (int i = 0; i < actual.num_columns(); ++i) {
+      AssertChunkedEqual(*expected.column(i)->data(), *actual.column(i)->data());
+    }
+  } else {
+    std::stringstream ss;
+    if (!actual.Equals(expected)) {
+      for (int i = 0; i < expected.num_columns(); ++i) {
+        ss << "Actual column " << i << std::endl;
+        PrintColumn(*actual.column(i), &ss);
+
+        ss << "Expected column " << i << std::endl;
+        PrintColumn(*expected.column(i), &ss);
+      }
+      FAIL() << ss.str();
+    }
   }
-  ASSERT_TRUE(actual.Equals(expected));
+}
+
+void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
+                       int64_t row_group_size, const std::vector<int>& column_subset,
+                       std::shared_ptr<Table>* out,
+                       const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+                           default_arrow_writer_properties()) {
+  std::shared_ptr<Buffer> buffer;
+  WriteTableToBuffer(table, num_threads, row_group_size, arrow_properties, &buffer);
+
+  std::unique_ptr<FileReader> reader;
+  ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+                              ::arrow::default_memory_pool(),
+                              ::parquet::default_reader_properties(), nullptr, &reader));
+
+  reader->set_num_threads(num_threads);
+
+  if (column_subset.size() > 0) {
+    ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
+  } else {
+    // Read everything
+    ASSERT_OK_NO_THROW(reader->ReadTable(out));
+  }
+}
+
+void CheckSimpleRoundtrip(const std::shared_ptr<Table>& table, int64_t row_group_size,
+                          const std::shared_ptr<ArrowWriterProperties>& arrow_properties =
+                              default_arrow_writer_properties()) {
+  std::shared_ptr<Table> result;
+  DoSimpleRoundtrip(table, 1, row_group_size, {}, &result, arrow_properties);
+  AssertTablesEqual(*table, *result, false);
+}
+
+static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::DataType& type,
+                                                   Repetition::type repetition) {
+  int32_t byte_width = -1;
+  int32_t precision = -1;
+  int32_t scale = -1;
+
+  switch (type.id()) {
+    case ::arrow::Type::DICTIONARY: {
+      const auto& dict_type = static_cast<const ::arrow::DictionaryType&>(type);
+      const ::DataType& values_type = *dict_type.dictionary()->type();
+      switch (values_type.id()) {
+        case ::arrow::Type::FIXED_SIZE_BINARY:
+          byte_width =
+              static_cast<const ::arrow::FixedSizeBinaryType&>(values_type).byte_width();
+          break;
+        case ::arrow::Type::DECIMAL: {
+          const auto& decimal_type =
+              static_cast<const ::arrow::Decimal128Type&>(values_type);
+          precision = decimal_type.precision();
+          scale = decimal_type.scale();
+          byte_width = DecimalSize(precision);
+        } break;
+        default:
+          break;
+      }
+    } break;
+    case ::arrow::Type::FIXED_SIZE_BINARY:
+      byte_width = static_cast<const ::arrow::FixedSizeBinaryType&>(type).byte_width();
+      break;
+    case ::arrow::Type::DECIMAL: {
+      const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(type);
+      precision = decimal_type.precision();
+      scale = decimal_type.scale();
+      byte_width = DecimalSize(precision);
+    } break;
+    default:
+      break;
+  }
+  auto pnode = PrimitiveNode::Make("column1", repetition, get_physical_type(type),
+                                   get_logical_type(type), byte_width, precision, scale);
+  NodePtr node_ =
+      GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
+  return std::static_pointer_cast<GroupNode>(node_);
 }
 
 template <typename TestType>
@@ -527,10 +564,7 @@
   }
 
   void CheckRoundTrip(const std::shared_ptr<Table>& table) {
-    std::shared_ptr<Table> result;
-    DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
-
-    AssertTablesEqual(*table, *result);
+    CheckSimpleRoundtrip(table, table->num_rows());
   }
 
   template <typename ArrayType>
@@ -1315,32 +1349,48 @@
 
   auto f0 = field("f0", ::arrow::date64());
   auto f1 = field("f1", ::arrow::time32(TimeUnit::SECOND));
-  std::shared_ptr<::arrow::Schema> schema(new ::arrow::Schema({f0, f1}));
+  auto f2 = field("f2", ::arrow::date64());
+  auto f3 = field("f3", ::arrow::time32(TimeUnit::SECOND));
+
+  auto schema = ::arrow::schema({f0, f1, f2, f3});
 
   std::vector<int64_t> a0_values = {1489190400000, 1489276800000, 1489363200000,
                                     1489449600000, 1489536000000, 1489622400000};
   std::vector<int32_t> a1_values = {0, 1, 2, 3, 4, 5};
 
-  std::shared_ptr<Array> a0, a1, x0, x1;
+  std::shared_ptr<Array> a0, a1, a0_nonnull, a1_nonnull, x0, x1, x0_nonnull, x1_nonnull;
+
   ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), is_valid, a0_values, &a0);
+  ArrayFromVector<::arrow::Date64Type, int64_t>(f0->type(), a0_values, &a0_nonnull);
+
   ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), is_valid, a1_values, &a1);
+  ArrayFromVector<::arrow::Time32Type, int32_t>(f1->type(), a1_values, &a1_nonnull);
 
   std::vector<std::shared_ptr<::arrow::Column>> columns = {
-      std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1)};
+      std::make_shared<Column>("f0", a0), std::make_shared<Column>("f1", a1),
+      std::make_shared<Column>("f2", a0_nonnull),
+      std::make_shared<Column>("f3", a1_nonnull)};
   auto table = Table::Make(schema, columns);
 
   // Expected schema and values
   auto e0 = field("f0", ::arrow::date32());
   auto e1 = field("f1", ::arrow::time32(TimeUnit::MILLI));
-  std::shared_ptr<::arrow::Schema> ex_schema(new ::arrow::Schema({e0, e1}));
+  auto e2 = field("f2", ::arrow::date32());
+  auto e3 = field("f3", ::arrow::time32(TimeUnit::MILLI));
+  auto ex_schema = ::arrow::schema({e0, e1, e2, e3});
 
   std::vector<int32_t> x0_values = {17236, 17237, 17238, 17239, 17240, 17241};
   std::vector<int32_t> x1_values = {0, 1000, 2000, 3000, 4000, 5000};
   ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), is_valid, x0_values, &x0);
+  ArrayFromVector<::arrow::Date32Type, int32_t>(e0->type(), x0_values, &x0_nonnull);
+
   ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), is_valid, x1_values, &x1);
+  ArrayFromVector<::arrow::Time32Type, int32_t>(e1->type(), x1_values, &x1_nonnull);
 
   std::vector<std::shared_ptr<::arrow::Column>> ex_columns = {
-      std::make_shared<Column>("f0", x0), std::make_shared<Column>("f1", x1)};
+      std::make_shared<Column>("f0", x0), std::make_shared<Column>("f1", x1),
+      std::make_shared<Column>("f2", x0_nonnull),
+      std::make_shared<Column>("f3", x1_nonnull)};
   auto ex_table = Table::Make(ex_schema, ex_columns);
 
   std::shared_ptr<Table> result;
@@ -1375,6 +1425,43 @@
   *out = Table::Make(schema, columns);
 }
 
+void MakeListArray(int num_rows, std::shared_ptr<::DataType>* out_type,
+                   std::shared_ptr<Array>* out_array) {
+  ::arrow::Int32Builder offset_builder;
+
+  std::vector<int32_t> length_draws;
+  randint(num_rows, 0, 100, &length_draws);
+
+  std::vector<int32_t> offset_values;
+
+  // Make sure some of them are length 0
+  int32_t total_elements = 0;
+  for (size_t i = 0; i < length_draws.size(); ++i) {
+    if (length_draws[i] < 10) {
+      length_draws[i] = 0;
+    }
+    offset_values.push_back(total_elements);
+    total_elements += length_draws[i];
+  }
+  offset_values.push_back(total_elements);
+
+  std::vector<int8_t> value_draws;
+  randint(total_elements, 0, 100, &value_draws);
+
+  std::vector<bool> is_valid;
+  random_is_valid(total_elements, 0.1, &is_valid);
+
+  std::shared_ptr<Array> values, offsets;
+  ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(), is_valid,
+                                                      value_draws, &values);
+  ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values, &offsets);
+
+  ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values, default_memory_pool(),
+                                           out_array));
+
+  *out_type = ::arrow::list(::arrow::int8());
+}
+
 TEST(TestArrowReadWrite, MultithreadedRead) {
   const int num_columns = 20;
   const int num_rows = 1000;
@@ -1464,51 +1551,16 @@
   AssertTablesEqual(*expected, *result);
 }
 
-void MakeListTable(int num_rows, std::shared_ptr<Table>* out) {
-  ::arrow::Int32Builder offset_builder;
-
-  std::vector<int32_t> length_draws;
-  randint(num_rows, 0, 100, &length_draws);
-
-  std::vector<int32_t> offset_values;
-
-  // Make sure some of them are length 0
-  int32_t total_elements = 0;
-  for (size_t i = 0; i < length_draws.size(); ++i) {
-    if (length_draws[i] < 10) {
-      length_draws[i] = 0;
-    }
-    offset_values.push_back(total_elements);
-    total_elements += length_draws[i];
-  }
-  offset_values.push_back(total_elements);
-
-  std::vector<int8_t> value_draws;
-  randint(total_elements, 0, 100, &value_draws);
-
-  std::vector<bool> is_valid;
-  random_is_valid(total_elements, 0.1, &is_valid);
-
-  std::shared_ptr<Array> values, offsets;
-  ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(), is_valid,
-                                                      value_draws, &values);
-  ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values, &offsets);
-
-  std::shared_ptr<Array> list_array;
-  ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values, default_memory_pool(),
-                                           &list_array));
-
-  auto f1 = ::arrow::field("a", ::arrow::list(::arrow::int8()));
-  auto schema = ::arrow::schema({f1});
-  std::vector<std::shared_ptr<Array>> arrays = {list_array};
-  *out = Table::Make(schema, arrays);
-}
-
 TEST(TestArrowReadWrite, ListLargeRecords) {
   const int num_rows = 50;
 
-  std::shared_ptr<Table> table;
-  MakeListTable(num_rows, &table);
+  std::shared_ptr<Array> list_array;
+  std::shared_ptr<::DataType> list_type;
+
+  MakeListArray(num_rows, &list_type, &list_array);
+
+  auto schema = ::arrow::schema({::arrow::field("a", list_type)});
+  std::shared_ptr<Table> table = Table::Make(schema, {list_array});
 
   std::shared_ptr<Buffer> buffer;
   WriteTableToBuffer(table, 1, 100, default_arrow_writer_properties(), &buffer);
@@ -1549,6 +1601,74 @@
   ASSERT_TRUE(table->Equals(*chunked_table));
 }
 
+typedef std::function<void(int, std::shared_ptr<::DataType>*, std::shared_ptr<Array>*)>
+    ArrayFactory;
+
+template <typename ArrowType>
+struct GenerateArrayFunctor {
+  explicit GenerateArrayFunctor(double pct_null = 0.1) : pct_null(pct_null) {}
+
+  void operator()(int length, std::shared_ptr<::DataType>* type,
+                  std::shared_ptr<Array>* array) {
+    using T = typename ArrowType::c_type;
+
+    // TODO(wesm): generate things other than integers
+    std::vector<T> draws;
+    randint(length, 0, 100, &draws);
+
+    std::vector<bool> is_valid;
+    random_is_valid(length, this->pct_null, &is_valid);
+
+    *type = ::arrow::TypeTraits<ArrowType>::type_singleton();
+    ::arrow::ArrayFromVector<ArrowType, T>(*type, is_valid, draws, array);
+  }
+
+  double pct_null;
+};
+
+typedef std::function<void(int, std::shared_ptr<::DataType>*, std::shared_ptr<Array>*)>
+    ArrayFactory;
+
+auto GenerateInt32 = [](int length, std::shared_ptr<::DataType>* type,
+                        std::shared_ptr<Array>* array) {
+  GenerateArrayFunctor<::arrow::Int32Type> func;
+  func(length, type, array);
+};
+
+auto GenerateList = [](int length, std::shared_ptr<::DataType>* type,
+                       std::shared_ptr<Array>* array) {
+  MakeListArray(length, type, array);
+};
+
+TEST(TestArrowReadWrite, TableWithChunkedColumns) {
+  std::vector<ArrayFactory> functions = {GenerateInt32, GenerateList};
+
+  std::vector<int> chunk_sizes = {2, 4, 10, 2};
+  const int64_t total_length = 18;
+
+  for (const auto& datagen_func : functions) {
+    ::arrow::ArrayVector arrays;
+    std::shared_ptr<Array> arr;
+    std::shared_ptr<::DataType> type;
+    datagen_func(total_length, &type, &arr);
+
+    int64_t offset = 0;
+    for (int chunk_size : chunk_sizes) {
+      arrays.push_back(arr->Slice(offset, chunk_size));
+      offset += chunk_size;
+    }
+
+    auto field = ::arrow::field("fname", type);
+    auto schema = ::arrow::schema({field});
+    auto col = std::make_shared<::arrow::Column>(field, arrays);
+    auto table = Table::Make(schema, {col});
+
+    CheckSimpleRoundtrip(table, 2);
+    CheckSimpleRoundtrip(table, 3);
+    CheckSimpleRoundtrip(table, 10);
+  }
+}
+
 TEST(TestArrowWrite, CheckChunkSize) {
   const int num_columns = 2;
   const int num_rows = 128;
@@ -1943,13 +2063,13 @@
 
 class TestArrowReaderAdHocSpark
     : public ::testing::TestWithParam<
-          std::tuple<std::string, std::shared_ptr<::arrow::DataType>>> {};
+          std::tuple<std::string, std::shared_ptr<::DataType>>> {};
 
 TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) {
   std::string path(std::getenv("PARQUET_TEST_DATA"));
 
   std::string filename;
-  std::shared_ptr<::arrow::DataType> decimal_type;
+  std::shared_ptr<::DataType> decimal_type;
   std::tie(filename, decimal_type) = GetParam();
 
   path += "/" + filename;
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 771b996..b33eda1 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -62,8 +62,8 @@
     for (int i = 0; i < expected_schema->num_fields(); ++i) {
       auto lhs = result_schema_->field(i);
       auto rhs = expected_schema->field(i);
-      EXPECT_TRUE(lhs->Equals(rhs))
-          << i << " " << lhs->ToString() << " != " << rhs->ToString();
+      EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
+                                    << " != " << rhs->ToString();
     }
   }
 
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 53065a6..9318305 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -717,8 +717,7 @@
 
 template <typename ArrowType, typename ParquetType>
 using supports_fast_path =
-    typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value,
-                            ParquetType>::type;
+    typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value>::type;
 
 template <typename ArrowType, typename ParquetType, typename Enable = void>
 struct TransferFunctor {
@@ -728,6 +727,10 @@
   Status operator()(RecordReader* reader, MemoryPool* pool,
                     const std::shared_ptr<::arrow::DataType>& type,
                     std::shared_ptr<Array>* out) {
+    static_assert(!std::is_same<ArrowType, ::arrow::Int32Type>::value,
+                  "The fast path transfer functor should be used "
+                  "for primitive values");
+
     int64_t length = reader->values_written();
     std::shared_ptr<Buffer> data;
     RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data));
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index d7001aa..85d5bd3 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -31,6 +31,7 @@
 
 using arrow::Array;
 using arrow::BinaryArray;
+using arrow::ChunkedArray;
 using arrow::FixedSizeBinaryArray;
 using arrow::Decimal128Array;
 using arrow::BooleanArray;
@@ -65,12 +66,12 @@
   return default_writer_properties;
 }
 
+namespace {
+
 class LevelBuilder {
  public:
   explicit LevelBuilder(MemoryPool* pool)
-      : def_levels_(::arrow::int16(), pool), rep_levels_(::arrow::int16(), pool) {
-    def_levels_buffer_ = std::make_shared<PoolBuffer>(pool);
-  }
+      : def_levels_(::arrow::int16(), pool), rep_levels_(::arrow::int16(), pool) {}
 
   Status VisitInline(const Array& array);
 
@@ -80,7 +81,6 @@
     array_offsets_.push_back(static_cast<int32_t>(array.offset()));
     valid_bitmaps_.push_back(array.null_bitmap_data());
     null_counts_.push_back(array.null_count());
-    values_type_ = array.type_id();
     values_array_ = std::make_shared<T>(array.data());
     return Status::OK();
   }
@@ -106,13 +106,12 @@
   NOT_IMPLEMENTED_VISIT(Struct)
   NOT_IMPLEMENTED_VISIT(Union)
   NOT_IMPLEMENTED_VISIT(Dictionary)
-  NOT_IMPLEMENTED_VISIT(Interval)
 
   Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
-                        int64_t* values_offset, ::arrow::Type::type* values_type,
-                        int64_t* num_values, int64_t* num_levels,
-                        std::shared_ptr<Buffer>* def_levels,
-                        std::shared_ptr<Buffer>* rep_levels,
+                        int64_t* values_offset, int64_t* num_values, int64_t* num_levels,
+                        const std::shared_ptr<PoolBuffer>& def_levels_scratch,
+                        std::shared_ptr<Buffer>* def_levels_out,
+                        std::shared_ptr<Buffer>* rep_levels_out,
                         std::shared_ptr<Array>* values_array) {
     // Work downwards to extract bitmaps and offsets
     min_offset_idx_ = 0;
@@ -120,7 +119,6 @@
     RETURN_NOT_OK(VisitInline(array));
     *num_values = max_offset_idx_ - min_offset_idx_;
     *values_offset = min_offset_idx_;
-    *values_type = values_type_;
     *values_array = values_array_;
 
     // Walk downwards to extract nullability
@@ -139,11 +137,12 @@
     // Generate the levels.
     if (nullable_.size() == 1) {
       // We have a PrimitiveArray
-      *rep_levels = nullptr;
+      *rep_levels_out = nullptr;
       if (nullable_[0]) {
-        RETURN_NOT_OK(def_levels_buffer_->Resize(array.length() * sizeof(int16_t)));
+        RETURN_NOT_OK(
+            def_levels_scratch->Resize(array.length() * sizeof(int16_t), false));
         auto def_levels_ptr =
-            reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
+            reinterpret_cast<int16_t*>(def_levels_scratch->mutable_data());
         if (array.null_count() == 0) {
           std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1);
         } else if (array.null_count() == array.length()) {
@@ -152,17 +151,14 @@
           ::arrow::internal::BitmapReader valid_bits_reader(
               array.null_bitmap_data(), array.offset(), array.length());
           for (int i = 0; i < array.length(); i++) {
-            if (valid_bits_reader.IsSet()) {
-              def_levels_ptr[i] = 1;
-            } else {
-              def_levels_ptr[i] = 0;
-            }
+            def_levels_ptr[i] = valid_bits_reader.IsSet() ? 1 : 0;
             valid_bits_reader.Next();
           }
         }
-        *def_levels = def_levels_buffer_;
+
+        *def_levels_out = def_levels_scratch;
       } else {
-        *def_levels = nullptr;
+        *def_levels_out = nullptr;
       }
       *num_levels = array.length();
     } else {
@@ -170,12 +166,13 @@
       RETURN_NOT_OK(HandleListEntries(0, 0, 0, array.length()));
 
       std::shared_ptr<Array> def_levels_array;
-      RETURN_NOT_OK(def_levels_.Finish(&def_levels_array));
-      *def_levels = static_cast<PrimitiveArray*>(def_levels_array.get())->values();
-
       std::shared_ptr<Array> rep_levels_array;
+
+      RETURN_NOT_OK(def_levels_.Finish(&def_levels_array));
       RETURN_NOT_OK(rep_levels_.Finish(&rep_levels_array));
-      *rep_levels = static_cast<PrimitiveArray*>(rep_levels_array.get())->values();
+
+      *def_levels_out = static_cast<PrimitiveArray*>(def_levels_array.get())->values();
+      *rep_levels_out = static_cast<PrimitiveArray*>(rep_levels_array.get())->values();
       *num_levels = rep_levels_array->length();
     }
 
@@ -242,7 +239,6 @@
 
  private:
   Int16Builder def_levels_;
-  std::shared_ptr<PoolBuffer> def_levels_buffer_;
   Int16Builder rep_levels_;
 
   std::vector<int64_t> null_counts_;
@@ -253,7 +249,6 @@
 
   int64_t min_offset_idx_;
   int64_t max_offset_idx_;
-  ::arrow::Type::type values_type_;
   std::shared_ptr<Array> values_array_;
 };
 
@@ -261,164 +256,221 @@
   return VisitArrayInline(array, this);
 }
 
-class FileWriter::Impl {
+struct ColumnWriterContext {
+  ColumnWriterContext(MemoryPool* memory_pool, ArrowWriterProperties* properties)
+      : memory_pool(memory_pool), properties(properties) {
+    this->data_buffer = std::make_shared<PoolBuffer>(memory_pool);
+    this->def_levels_buffer = std::make_shared<PoolBuffer>(memory_pool);
+  }
+
+  template <typename T>
+  Status GetScratchData(const int64_t num_values, T** out) {
+    RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false));
+    *out = reinterpret_cast<T*>(this->data_buffer->mutable_data());
+    return Status::OK();
+  }
+
+  MemoryPool* memory_pool;
+  ArrowWriterProperties* properties;
+
+  // Buffer used for storing the data of an array converted to the physical type
+  // as expected by parquet-cpp.
+  std::shared_ptr<PoolBuffer> data_buffer;
+
+  // We use the shared ownership of this buffer
+  std::shared_ptr<PoolBuffer> def_levels_buffer;
+};
+
+Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type) {
+  if (type.id() == ::arrow::Type::LIST || type.id() == ::arrow::Type::STRUCT) {
+    if (type.num_children() != 1) {
+      return Status::Invalid("Nested column branch had multiple children");
+    }
+    return GetLeafType(*type.child(0)->type(), leaf_type);
+  } else {
+    *leaf_type = type.id();
+    return Status::OK();
+  }
+}
+
+class ArrowColumnWriter {
  public:
-  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
-       const std::shared_ptr<ArrowWriterProperties>& arrow_properties);
+  ArrowColumnWriter(ColumnWriterContext* ctx, ColumnWriter* column_writer,
+                    const std::shared_ptr<Field>& field)
+      : ctx_(ctx), writer_(column_writer), field_(field) {}
 
-  Status NewRowGroup(int64_t chunk_size);
+  Status Write(const Array& data);
 
+  Status Write(const ChunkedArray& data, int64_t offset, const int64_t size) {
+    int64_t absolute_position = 0;
+    int chunk_index = 0;
+    int64_t chunk_offset = 0;
+    while (chunk_index < data.num_chunks() && absolute_position < offset) {
+      const int64_t chunk_length = data.chunk(chunk_index)->length();
+      if (absolute_position + chunk_length > offset) {
+        // Relative offset into the chunk to reach the desired start offset for
+        // writing
+        chunk_offset = offset - absolute_position;
+        break;
+      } else {
+        ++chunk_index;
+        absolute_position += chunk_length;
+      }
+    }
+
+    if (absolute_position >= data.length()) {
+      return Status::Invalid("Cannot write data at offset past end of chunked array");
+    }
+
+    int64_t values_written = 0;
+    while (values_written < size) {
+      const Array& chunk = *data.chunk(chunk_index);
+      const int64_t available_values = chunk.length() - chunk_offset;
+      const int64_t chunk_write_size = std::min(size - values_written, available_values);
+
+      // The chunk offset here will be 0 except for possibly the first chunk
+      // because of the advancing logic above
+      std::shared_ptr<Array> array_to_write = chunk.Slice(chunk_offset, chunk_write_size);
+      RETURN_NOT_OK(Write(*array_to_write));
+
+      if (chunk_write_size == available_values) {
+        chunk_offset = 0;
+        ++chunk_index;
+      }
+      values_written += chunk_write_size;
+    }
+
+    return Status::OK();
+  }
+
+  Status Close() {
+    PARQUET_CATCH_NOT_OK(writer_->Close());
+    return Status::OK();
+  }
+
+ private:
   template <typename ParquetType, typename ArrowType>
-  Status TypedWriteBatch(ColumnWriter* column_writer, const std::shared_ptr<Array>& data,
-                         int64_t num_levels, const int16_t* def_levels,
+  Status TypedWriteBatch(const Array& data, int64_t num_levels, const int16_t* def_levels,
                          const int16_t* rep_levels);
 
-  Status WriteTimestamps(ColumnWriter* column_writer, const std::shared_ptr<Array>& data,
-                         int64_t num_levels, const int16_t* def_levels,
+  Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* def_levels,
                          const int16_t* rep_levels);
 
-  Status WriteTimestampsCoerce(ColumnWriter* column_writer,
-                               const std::shared_ptr<Array>& data, int64_t num_levels,
+  Status WriteTimestampsCoerce(const Array& data, int64_t num_levels,
                                const int16_t* def_levels, const int16_t* rep_levels);
 
   template <typename ParquetType, typename ArrowType>
-  Status WriteNonNullableBatch(TypedColumnWriter<ParquetType>* column_writer,
-                               const ArrowType& type, int64_t num_values,
+  Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
                                int64_t num_levels, const int16_t* def_levels,
                                const int16_t* rep_levels,
-                               const typename ArrowType::c_type* data_ptr);
+                               const typename ArrowType::c_type* values);
 
   template <typename ParquetType, typename ArrowType>
-  Status WriteNullableBatch(TypedColumnWriter<ParquetType>* column_writer,
-                            const ArrowType& type, int64_t num_values, int64_t num_levels,
+  Status WriteNullableBatch(const ArrowType& type, int64_t num_values, int64_t num_levels,
                             const int16_t* def_levels, const int16_t* rep_levels,
                             const uint8_t* valid_bits, int64_t valid_bits_offset,
-                            const typename ArrowType::c_type* data_ptr);
+                            const typename ArrowType::c_type* values);
 
-  Status WriteColumnChunk(const Array& data);
-  Status Close();
+  template <typename ParquetType>
+  Status WriteBatch(int64_t num_levels, const int16_t* def_levels,
+                    const int16_t* rep_levels,
+                    const typename ParquetType::c_type* values) {
+    auto typed_writer = static_cast<TypedColumnWriter<ParquetType>*>(writer_);
+    PARQUET_CATCH_NOT_OK(
+        typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values));
+    return Status::OK();
+  }
 
-  const WriterProperties& properties() const { return *writer_->properties(); }
+  template <typename ParquetType>
+  Status WriteBatchSpaced(int64_t num_levels, const int16_t* def_levels,
+                          const int16_t* rep_levels, const uint8_t* valid_bits,
+                          int64_t valid_bits_offset,
+                          const typename ParquetType::c_type* values) {
+    auto typed_writer = static_cast<TypedColumnWriter<ParquetType>*>(writer_);
+    PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced(
+        num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, values));
+    return Status::OK();
+  }
 
-  virtual ~Impl() {}
-
- private:
-  friend class FileWriter;
-
-  MemoryPool* pool_;
-  // Buffer used for storing the data of an array converted to the physical type
-  // as expected by parquet-cpp.
-  PoolBuffer data_buffer_;
-  std::unique_ptr<ParquetFileWriter> writer_;
-  RowGroupWriter* row_group_writer_;
-  std::shared_ptr<ArrowWriterProperties> arrow_properties_;
-  bool closed_;
+  ColumnWriterContext* ctx_;
+  ColumnWriter* writer_;
+  std::shared_ptr<Field> field_;
 };
 
-FileWriter::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
-                       const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
-    : pool_(pool),
-      data_buffer_(pool),
-      writer_(std::move(writer)),
-      row_group_writer_(nullptr),
-      arrow_properties_(arrow_properties),
-      closed_(false) {}
-
-Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
-  if (row_group_writer_ != nullptr) {
-    PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
-  }
-  PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup());
-  return Status::OK();
-}
-
-// ----------------------------------------------------------------------
-// Column type specialization
-
 template <typename ParquetType, typename ArrowType>
-Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
-                                         const std::shared_ptr<Array>& array,
-                                         int64_t num_levels, const int16_t* def_levels,
-                                         const int16_t* rep_levels) {
+Status ArrowColumnWriter::TypedWriteBatch(const Array& array, int64_t num_levels,
+                                          const int16_t* def_levels,
+                                          const int16_t* rep_levels) {
   using ArrowCType = typename ArrowType::c_type;
 
-  const auto& data = static_cast<const PrimitiveArray&>(*array);
-  auto data_ptr =
+  const auto& data = static_cast<const PrimitiveArray&>(array);
+  auto values =
       reinterpret_cast<const ArrowCType*>(data.values()->data()) + data.offset();
-  auto writer = reinterpret_cast<TypedColumnWriter<ParquetType>*>(column_writer);
 
-  if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
+  if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
     // no nulls, just dump the data
     RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(
-        writer, static_cast<const ArrowType&>(*array->type()), array->length(),
-        num_levels, def_levels, rep_levels, data_ptr)));
+        static_cast<const ArrowType&>(*array.type()), array.length(), num_levels,
+        def_levels, rep_levels, values)));
   } else {
     const uint8_t* valid_bits = data.null_bitmap_data();
     RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(
-        writer, static_cast<const ArrowType&>(*array->type()), data.length(), num_levels,
-        def_levels, rep_levels, valid_bits, data.offset(), data_ptr)));
+        static_cast<const ArrowType&>(*array.type()), data.length(), num_levels,
+        def_levels, rep_levels, valid_bits, data.offset(), values)));
   }
-  PARQUET_CATCH_NOT_OK(writer->Close());
   return Status::OK();
 }
 
 template <typename ParquetType, typename ArrowType>
-Status FileWriter::Impl::WriteNonNullableBatch(
-    TypedColumnWriter<ParquetType>* writer, const ArrowType& type, int64_t num_values,
-    int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
-    const typename ArrowType::c_type* data_ptr) {
+Status ArrowColumnWriter::WriteNonNullableBatch(
+    const ArrowType& type, int64_t num_values, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels,
+    const typename ArrowType::c_type* values) {
   using ParquetCType = typename ParquetType::c_type;
-  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
-  auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
-  std::copy(data_ptr, data_ptr + num_values, buffer_ptr);
-  PARQUET_CATCH_NOT_OK(
-      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
-  return Status::OK();
+  ParquetCType* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_values, &buffer));
+
+  std::copy(values, values + num_values, buffer);
+
+  return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer);
 }
 
 template <>
-Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>(
-    TypedColumnWriter<Int32Type>* writer, const ::arrow::Date64Type& type,
-    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels, const int64_t* data_ptr) {
-  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
-  auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>(
+    const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
+  int32_t* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
+
   for (int i = 0; i < num_values; i++) {
-    buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000);
+    buffer[i] = static_cast<int32_t>(values[i] / 86400000);
   }
-  PARQUET_CATCH_NOT_OK(
-      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
-  return Status::OK();
+
+  return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
 }
 
 template <>
-Status FileWriter::Impl::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
-    TypedColumnWriter<Int32Type>* writer, const ::arrow::Time32Type& type,
-    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels, const int32_t* data_ptr) {
-  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
-  auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
+    const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels, const int32_t* values) {
+  int32_t* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
   if (type.unit() == TimeUnit::SECOND) {
     for (int i = 0; i < num_values; i++) {
-      buffer_ptr[i] = data_ptr[i] * 1000;
+      buffer[i] = values[i] * 1000;
     }
   } else {
-    std::copy(data_ptr, data_ptr + num_values, buffer_ptr);
+    std::copy(values, values + num_values, buffer);
   }
-  PARQUET_CATCH_NOT_OK(
-      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
-  return Status::OK();
+  return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
 }
 
-#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)         \
-  template <>                                                              \
-  Status FileWriter::Impl::WriteNonNullableBatch<ParquetType, ArrowType>(  \
-      TypedColumnWriter<ParquetType> * writer, const ArrowType& type,      \
-      int64_t num_values, int64_t num_levels, const int16_t* def_levels,   \
-      const int16_t* rep_levels, const CType* data_ptr) {                  \
-    PARQUET_CATCH_NOT_OK(                                                  \
-        writer->WriteBatch(num_levels, def_levels, rep_levels, data_ptr)); \
-    return Status::OK();                                                   \
+#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                 \
+  template <>                                                                      \
+  Status ArrowColumnWriter::WriteNonNullableBatch<ParquetType, ArrowType>(         \
+      const ArrowType& type, int64_t num_values, int64_t num_levels,               \
+      const int16_t* def_levels, const int16_t* rep_levels, const CType* buffer) { \
+    return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer);    \
   }
 
 NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
@@ -429,96 +481,68 @@
 NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
 
 template <typename ParquetType, typename ArrowType>
-Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer,
-                                            const ArrowType& type, int64_t num_values,
-                                            int64_t num_levels, const int16_t* def_levels,
-                                            const int16_t* rep_levels,
-                                            const uint8_t* valid_bits,
-                                            int64_t valid_bits_offset,
-                                            const typename ArrowType::c_type* data_ptr) {
+Status ArrowColumnWriter::WriteNullableBatch(
+    const ArrowType& type, int64_t num_values, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+    int64_t valid_bits_offset, const typename ArrowType::c_type* values) {
   using ParquetCType = typename ParquetType::c_type;
 
-  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
-  auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
-  ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
-                                                    num_values);
+  ParquetCType* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_levels, &buffer));
   for (int i = 0; i < num_values; i++) {
-    if (valid_bits_reader.IsSet()) {
-      buffer_ptr[i] = static_cast<ParquetCType>(data_ptr[i]);
-    }
-    valid_bits_reader.Next();
+    buffer[i] = static_cast<ParquetCType>(values[i]);
   }
-  PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
-      num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
 
-  return Status::OK();
+  return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits,
+                                       valid_bits_offset, buffer);
 }
 
 template <>
-Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
-    TypedColumnWriter<Int32Type>* writer, const ::arrow::Date64Type& type,
-    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
-    const int64_t* data_ptr) {
-  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
-  auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
-  ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
-                                                    num_values);
-  for (int i = 0; i < num_values; i++) {
-    if (valid_bits_reader.IsSet()) {
-      // Convert from milliseconds into days since the epoch
-      buffer_ptr[i] = static_cast<int32_t>(data_ptr[i] / 86400000);
-    }
-    valid_bits_reader.Next();
-  }
-  PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
-      num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
+Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
+    const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+    int64_t valid_bits_offset, const int64_t* values) {
+  int32_t* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));
 
-  return Status::OK();
+  for (int i = 0; i < num_values; i++) {
+    // Convert from milliseconds into days since the epoch
+    buffer[i] = static_cast<int32_t>(values[i] / 86400000);
+  }
+
+  return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits,
+                                     valid_bits_offset, buffer);
 }
 
 template <>
-Status FileWriter::Impl::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
-    TypedColumnWriter<Int32Type>* writer, const ::arrow::Time32Type& type,
-    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
-    const int32_t* data_ptr) {
-  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(int32_t)));
-  auto buffer_ptr = reinterpret_cast<int32_t*>(data_buffer_.mutable_data());
-  ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
-                                                    num_values);
+Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
+    const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+    int64_t valid_bits_offset, const int32_t* values) {
+  int32_t* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));
 
   if (type.unit() == TimeUnit::SECOND) {
     for (int i = 0; i < num_values; i++) {
-      if (valid_bits_reader.IsSet()) {
-        buffer_ptr[i] = data_ptr[i] * 1000;
-      }
-      valid_bits_reader.Next();
+      buffer[i] = values[i] * 1000;
     }
   } else {
     for (int i = 0; i < num_values; i++) {
-      if (valid_bits_reader.IsSet()) {
-        buffer_ptr[i] = data_ptr[i];
-      }
-      valid_bits_reader.Next();
+      buffer[i] = values[i];
     }
   }
-  PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
-      num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
-
-  return Status::OK();
+  return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits,
+                                     valid_bits_offset, buffer);
 }
 
-#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                        \
-  template <>                                                                          \
-  Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>(                 \
-      TypedColumnWriter<ParquetType> * writer, const ArrowType& type,                  \
-      int64_t num_values, int64_t num_levels, const int16_t* def_levels,               \
-      const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset, \
-      const CType* data_ptr) {                                                         \
-    PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(                                     \
-        num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \
-    return Status::OK();                                                               \
+#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                          \
+  template <>                                                                            \
+  Status ArrowColumnWriter::WriteNullableBatch<ParquetType, ArrowType>(                  \
+      const ArrowType& type, int64_t num_values, int64_t num_levels,                     \
+      const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,   \
+      int64_t valid_bits_offset, const CType* values) {                                  \
+    return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits, \
+                                         valid_bits_offset, values);                     \
   }
 
 NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
@@ -527,122 +551,99 @@
 NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
 NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
 NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
-
-// ----------------------------------------------------------------------
-// Write timestamps
-
 NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
 NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
 
 template <>
-Status FileWriter::Impl::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
-    TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
-    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
-    const int64_t* data_ptr) {
-  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
-  auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
-  ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
-                                                    num_values);
+Status ArrowColumnWriter::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
+    const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+    int64_t valid_bits_offset, const int64_t* values) {
+  Int96* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
   if (type.unit() == TimeUnit::NANO) {
     for (int i = 0; i < num_values; i++) {
-      if (valid_bits_reader.IsSet()) {
-        internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
-      }
-      valid_bits_reader.Next();
+      internal::NanosecondsToImpalaTimestamp(values[i], &buffer[i]);
     }
   } else {
     return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
   }
-  PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
-      num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
-
-  return Status::OK();
+  return WriteBatchSpaced<Int96Type>(num_levels, def_levels, rep_levels, valid_bits,
+                                     valid_bits_offset, buffer);
 }
 
 template <>
-Status FileWriter::Impl::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
-    TypedColumnWriter<Int96Type>* writer, const ::arrow::TimestampType& type,
-    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
-    const int16_t* rep_levels, const int64_t* data_ptr) {
-  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(Int96)));
-  auto buffer_ptr = reinterpret_cast<Int96*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
+    const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
+  Int96* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
   if (type.unit() == TimeUnit::NANO) {
     for (int i = 0; i < num_values; i++) {
-      internal::NanosecondsToImpalaTimestamp(data_ptr[i], buffer_ptr + i);
+      internal::NanosecondsToImpalaTimestamp(values[i], buffer + i);
     }
   } else {
     return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
   }
-  PARQUET_CATCH_NOT_OK(
-      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
-  return Status::OK();
+  return WriteBatch<Int96Type>(num_levels, def_levels, rep_levels, buffer);
 }
 
-Status FileWriter::Impl::WriteTimestamps(ColumnWriter* column_writer,
-                                         const std::shared_ptr<Array>& values,
-                                         int64_t num_levels, const int16_t* def_levels,
-                                         const int16_t* rep_levels) {
-  const auto& type = static_cast<::arrow::TimestampType&>(*values->type());
+Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_levels,
+                                          const int16_t* def_levels,
+                                          const int16_t* rep_levels) {
+  const auto& type = static_cast<const ::arrow::TimestampType&>(*values.type());
 
   const bool is_nanosecond = type.unit() == TimeUnit::NANO;
 
-  if (is_nanosecond && arrow_properties_->support_deprecated_int96_timestamps()) {
-    return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(
-        column_writer, values, num_levels, def_levels, rep_levels);
+  if (is_nanosecond && ctx_->properties->support_deprecated_int96_timestamps()) {
+    return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(values, num_levels,
+                                                              def_levels, rep_levels);
   } else if (is_nanosecond ||
-             (arrow_properties_->coerce_timestamps_enabled() &&
-              (type.unit() != arrow_properties_->coerce_timestamps_unit()))) {
+             (ctx_->properties->coerce_timestamps_enabled() &&
+              (type.unit() != ctx_->properties->coerce_timestamps_unit()))) {
     // Casting is required. This covers several cases
     // * Nanoseconds -> cast to microseconds
     // * coerce_timestamps_enabled_, cast all timestamps to requested unit
-    return WriteTimestampsCoerce(column_writer, values, num_levels, def_levels,
-                                 rep_levels);
+    return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels);
   } else {
     // No casting of timestamps is required, take the fast path
-    return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(
-        column_writer, values, num_levels, def_levels, rep_levels);
+    return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
+                                                              def_levels, rep_levels);
   }
 }
 
-Status FileWriter::Impl::WriteTimestampsCoerce(ColumnWriter* column_writer,
-                                               const std::shared_ptr<Array>& array,
-                                               int64_t num_levels,
-                                               const int16_t* def_levels,
-                                               const int16_t* rep_levels) {
-  // Note that we can only use data_buffer_ here as we write timestamps with the fast
-  // path.
-  RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(int64_t)));
-  int64_t* data_buffer_ptr = reinterpret_cast<int64_t*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels,
+                                                const int16_t* def_levels,
+                                                const int16_t* rep_levels) {
+  int64_t* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer));
 
-  const auto& data = static_cast<const ::arrow::TimestampArray&>(*array);
+  const auto& data = static_cast<const ::arrow::TimestampArray&>(array);
 
-  auto data_ptr = data.raw_values();
-  auto writer = reinterpret_cast<TypedColumnWriter<Int64Type>*>(column_writer);
+  auto values = data.raw_values();
+  const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());
 
-  const auto& type = static_cast<const ::arrow::TimestampType&>(*array->type());
-
-  TimeUnit::type target_unit = arrow_properties_->coerce_timestamps_enabled()
-                                   ? arrow_properties_->coerce_timestamps_unit()
+  TimeUnit::type target_unit = ctx_->properties->coerce_timestamps_enabled()
+                                   ? ctx_->properties->coerce_timestamps_unit()
                                    : TimeUnit::MICRO;
   auto target_type = ::arrow::timestamp(target_unit);
 
   auto DivideBy = [&](const int64_t factor) {
-    for (int64_t i = 0; i < array->length(); i++) {
-      if (!data.IsNull(i) && (data_ptr[i] % factor != 0)) {
+    for (int64_t i = 0; i < array.length(); i++) {
+      if (!data.IsNull(i) && (values[i] % factor != 0)) {
         std::stringstream ss;
         ss << "Casting from " << type.ToString() << " to " << target_type->ToString()
-           << " would lose data: " << data_ptr[i];
+           << " would lose data: " << values[i];
         return Status::Invalid(ss.str());
       }
-      data_buffer_ptr[i] = data_ptr[i] / factor;
+      buffer[i] = values[i] / factor;
     }
     return Status::OK();
   };
 
   auto MultiplyBy = [&](const int64_t factor) {
-    for (int64_t i = 0; i < array->length(); i++) {
-      data_buffer_ptr[i] = data_ptr[i] * factor;
+    for (int64_t i = 0; i < array.length(); i++) {
+      buffer[i] = values[i] * factor;
     }
     return Status::OK();
   };
@@ -664,156 +665,140 @@
     RETURN_NOT_OK(DivideBy(1000));
   }
 
-  if (writer->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
+  if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
     // no nulls, just dump the data
     RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(
-        writer, static_cast<const ::arrow::TimestampType&>(*target_type), array->length(),
-        num_levels, def_levels, rep_levels, data_buffer_ptr)));
+        static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
+        num_levels, def_levels, rep_levels, buffer)));
   } else {
     const uint8_t* valid_bits = data.null_bitmap_data();
     RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(
-        writer, static_cast<const ::arrow::TimestampType&>(*target_type), array->length(),
-        num_levels, def_levels, rep_levels, valid_bits, data.offset(), data_buffer_ptr)));
+        static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
+        num_levels, def_levels, rep_levels, valid_bits, data.offset(), buffer)));
   }
-  PARQUET_CATCH_NOT_OK(writer->Close());
   return Status::OK();
 }
 
-// ----------------------------------------------------------------------
-
 // This specialization seems quite similar but it significantly differs in two points:
 // * offset is added at the most latest time to the pointer as we have sub-byte access
 // * Arrow data is stored bitwise thus we cannot use std::copy to transform from
 //   ArrowType::c_type to ParquetType::c_type
 
 template <>
-Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
-    ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels) {
-  RETURN_NOT_OK(data_buffer_.Resize(array->length()));
-  auto data = static_cast<const BooleanArray*>(array.get());
-  auto data_ptr = reinterpret_cast<const uint8_t*>(data->values()->data());
-  auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data());
-  auto writer = reinterpret_cast<TypedColumnWriter<BooleanType>*>(column_writer);
+Status ArrowColumnWriter::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
+    const Array& array, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels) {
+  bool* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<bool>(array.length(), &buffer));
+
+  const auto& data = static_cast<const BooleanArray&>(array);
+  auto values = reinterpret_cast<const uint8_t*>(data.values()->data());
 
   int buffer_idx = 0;
-  int64_t offset = array->offset();
-  for (int i = 0; i < data->length(); i++) {
-    if (!data->IsNull(i)) {
-      buffer_ptr[buffer_idx++] = BitUtil::GetBit(data_ptr, offset + i);
+  int64_t offset = array.offset();
+  for (int i = 0; i < data.length(); i++) {
+    if (!data.IsNull(i)) {
+      buffer[buffer_idx++] = BitUtil::GetBit(values, offset + i);
     }
   }
-  PARQUET_CATCH_NOT_OK(
-      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
-  PARQUET_CATCH_NOT_OK(writer->Close());
-  return Status::OK();
+
+  return WriteBatch<BooleanType>(num_levels, def_levels, rep_levels, buffer);
 }
 
 template <>
-Status FileWriter::Impl::TypedWriteBatch<Int32Type, ::arrow::NullType>(
-    ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels) {
-  auto writer = reinterpret_cast<TypedColumnWriter<Int32Type>*>(column_writer);
-
-  PARQUET_CATCH_NOT_OK(writer->WriteBatch(num_levels, def_levels, rep_levels, nullptr));
-  PARQUET_CATCH_NOT_OK(writer->Close());
-  return Status::OK();
+Status ArrowColumnWriter::TypedWriteBatch<Int32Type, ::arrow::NullType>(
+    const Array& array, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels) {
+  return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, nullptr);
 }
 
 template <>
-Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
-    ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels) {
-  RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(ByteArray)));
-  auto data = static_cast<const BinaryArray*>(array.get());
-  auto buffer_ptr = reinterpret_cast<ByteArray*>(data_buffer_.mutable_data());
+Status ArrowColumnWriter::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
+    const Array& array, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels) {
+  ByteArray* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<ByteArray>(num_levels, &buffer));
+
+  const auto& data = static_cast<const BinaryArray&>(array);
+
   // In the case of an array consisting of only empty strings or all null,
-  // data->data() points already to a nullptr, thus data->data()->data() will
+  // data.data() points already to a nullptr, thus data.data()->data() will
   // segfault.
-  const uint8_t* data_ptr = nullptr;
-  if (data->value_data()) {
-    data_ptr = reinterpret_cast<const uint8_t*>(data->value_data()->data());
-    DCHECK(data_ptr != nullptr);
+  const uint8_t* values = nullptr;
+  if (data.value_data()) {
+    values = reinterpret_cast<const uint8_t*>(data.value_data()->data());
+    DCHECK(values != nullptr);
   }
-  auto writer = reinterpret_cast<TypedColumnWriter<ByteArrayType>*>(column_writer);
 
   // Slice offset is accounted for in raw_value_offsets
-  const int32_t* value_offset = data->raw_value_offsets();
+  const int32_t* value_offset = data.raw_value_offsets();
 
-  if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
+  if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
     // no nulls, just dump the data
-    for (int64_t i = 0; i < data->length(); i++) {
-      buffer_ptr[i] =
-          ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]);
+    for (int64_t i = 0; i < data.length(); i++) {
+      buffer[i] =
+          ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
     }
   } else {
     int buffer_idx = 0;
-    for (int64_t i = 0; i < data->length(); i++) {
-      if (!data->IsNull(i)) {
-        buffer_ptr[buffer_idx++] =
-            ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]);
+    for (int64_t i = 0; i < data.length(); i++) {
+      if (!data.IsNull(i)) {
+        buffer[buffer_idx++] =
+            ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
       }
     }
   }
-  PARQUET_CATCH_NOT_OK(
-      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
-  PARQUET_CATCH_NOT_OK(writer->Close());
-  return Status::OK();
+
+  return WriteBatch<ByteArrayType>(num_levels, def_levels, rep_levels, buffer);
 }
 
 template <>
-Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>(
-    ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels) {
-  RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(FLBA), false));
-  const auto& data = static_cast<const FixedSizeBinaryArray&>(*array);
+Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>(
+    const Array& array, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels) {
+  const auto& data = static_cast<const FixedSizeBinaryArray&>(array);
   const int64_t length = data.length();
 
-  auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
+  FLBA* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));
 
-  auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
-
-  if (writer->descr()->schema_node()->is_required() || data.null_count() == 0) {
+  if (writer_->descr()->schema_node()->is_required() || data.null_count() == 0) {
     // no nulls, just dump the data
     // todo(advancedxy): use a writeBatch to avoid this step
     for (int64_t i = 0; i < length; i++) {
-      buffer_ptr[i] = FixedLenByteArray(data.GetValue(i));
+      buffer[i] = FixedLenByteArray(data.GetValue(i));
     }
   } else {
     int buffer_idx = 0;
     for (int64_t i = 0; i < length; i++) {
       if (!data.IsNull(i)) {
-        buffer_ptr[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
+        buffer[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
       }
     }
   }
-  PARQUET_CATCH_NOT_OK(
-      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
-  PARQUET_CATCH_NOT_OK(writer->Close());
-  return Status::OK();
+
+  return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
 }
 
 template <>
-Status FileWriter::Impl::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
-    ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
-    const int16_t* def_levels, const int16_t* rep_levels) {
-  const auto& data = static_cast<const Decimal128Array&>(*array);
+Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
+    const Array& array, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels) {
+  const auto& data = static_cast<const Decimal128Array&>(array);
   const int64_t length = data.length();
 
-  // TODO(phillipc): This is potentially very wasteful if we have a lot of nulls
-  std::vector<uint64_t> big_endian_values(static_cast<size_t>(length) * 2);
-
-  RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(FLBA), false));
-  auto buffer_ptr = reinterpret_cast<FLBA*>(data_buffer_.mutable_data());
-
-  auto writer = reinterpret_cast<TypedColumnWriter<FLBAType>*>(column_writer);
+  FLBA* buffer;
+  RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));
 
   const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*data.type());
   const int32_t offset =
       decimal_type.byte_width() - DecimalSize(decimal_type.precision());
 
   const bool does_not_have_nulls =
-      writer->descr()->schema_node()->is_required() || data.null_count() == 0;
+      writer_->descr()->schema_node()->is_required() || data.null_count() == 0;
+
+  // TODO(phillipc): This is potentially very wasteful if we have a lot of nulls
+  std::vector<uint64_t> big_endian_values(static_cast<size_t>(length) * 2);
 
   // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we
   // don't have to keep writing two loops to handle the case where we know there are no
@@ -825,7 +810,7 @@
       auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
       big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
       big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
-      buffer_ptr[i] = FixedLenByteArray(
+      buffer[i] = FixedLenByteArray(
           reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
     }
   } else {
@@ -834,77 +819,30 @@
         auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
         big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
         big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
-        buffer_ptr[buffer_idx++] = FixedLenByteArray(
+        buffer[buffer_idx++] = FixedLenByteArray(
             reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
         j += 2;
       }
     }
   }
-  PARQUET_CATCH_NOT_OK(
-      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
-  PARQUET_CATCH_NOT_OK(writer->Close());
-  return Status::OK();
+
+  return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
 }
 
-// End of column type specializations
-// ----------------------------------------------------------------------
-
-Status FileWriter::Impl::Close() {
-  if (!closed_) {
-    // Make idempotent
-    closed_ = true;
-    if (row_group_writer_ != nullptr) {
-      PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
-    }
-    PARQUET_CATCH_NOT_OK(writer_->Close());
-  }
-  return Status::OK();
-}
-
-Status FileWriter::NewRowGroup(int64_t chunk_size) {
-  return impl_->NewRowGroup(chunk_size);
-}
-
-Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
-  // DictionaryArrays are not yet handled with a fast path. To still support
-  // writing them as a workaround, we convert them back to their non-dictionary
-  // representation.
-  if (data.type()->id() == ::arrow::Type::DICTIONARY) {
-    const ::arrow::DictionaryType& dict_type =
-        static_cast<const ::arrow::DictionaryType&>(*data.type());
-
-    // TODO(ARROW-1648): Remove this special handling once we require an Arrow
-    // version that has this fixed.
-    if (dict_type.dictionary()->type()->id() == ::arrow::Type::NA) {
-      return WriteColumnChunk(::arrow::NullArray(data.length()));
-    }
-
-    FunctionContext ctx(pool_);
-    std::shared_ptr<Array> plain_array;
-    RETURN_NOT_OK(
-        Cast(&ctx, data, dict_type.dictionary()->type(), CastOptions(), &plain_array));
-    return WriteColumnChunk(*plain_array);
-  }
-
-  ColumnWriter* column_writer;
-  PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
-
-  int current_column_idx = row_group_writer_->current_column();
-  std::shared_ptr<::arrow::Schema> arrow_schema;
-  RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1},
-                                  writer_->key_value_metadata(), &arrow_schema));
-  std::shared_ptr<Buffer> def_levels_buffer;
-  std::shared_ptr<Buffer> rep_levels_buffer;
-  int64_t values_offset;
+Status ArrowColumnWriter::Write(const Array& data) {
   ::arrow::Type::type values_type;
-  int64_t num_levels;
-  int64_t num_values;
+  RETURN_NOT_OK(GetLeafType(*data.type(), &values_type));
 
   std::shared_ptr<Array> _values_array;
-  LevelBuilder level_builder(pool_);
+  int64_t values_offset;
+  int64_t num_levels;
+  int64_t num_values;
+  LevelBuilder level_builder(ctx_->memory_pool);
+
+  std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer;
   RETURN_NOT_OK(level_builder.GenerateLevels(
-      data, arrow_schema->field(0), &values_offset, &values_type, &num_values,
-      &num_levels, &def_levels_buffer, &rep_levels_buffer, &_values_array));
+      data, field_, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
+      &def_levels_buffer, &rep_levels_buffer, &_values_array));
   const int16_t* def_levels = nullptr;
   if (def_levels_buffer) {
     def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
@@ -915,27 +853,26 @@
   }
   std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values);
 
-#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType)  \
-  case ::arrow::Type::ArrowEnum:                             \
-    return TypedWriteBatch<ParquetType, ::arrow::ArrowType>( \
-        column_writer, values_array, num_levels, def_levels, rep_levels);
+#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType)                            \
+  case ::arrow::Type::ArrowEnum:                                                       \
+    return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(*values_array, num_levels, \
+                                                            def_levels, rep_levels);
 
   switch (values_type) {
     case ::arrow::Type::UINT32: {
       if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
         // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
         // to use the larger Int64Type to store them lossless.
-        return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(
-            column_writer, values_array, num_levels, def_levels, rep_levels);
+        return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(*values_array, num_levels,
+                                                               def_levels, rep_levels);
       } else {
-        return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(
-            column_writer, values_array, num_levels, def_levels, rep_levels);
+        return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(*values_array, num_levels,
+                                                               def_levels, rep_levels);
       }
     }
       WRITE_BATCH_CASE(NA, NullType, Int32Type)
     case ::arrow::Type::TIMESTAMP:
-      return WriteTimestamps(column_writer, values_array, num_levels, def_levels,
-                             rep_levels);
+      return WriteTimestamps(*values_array, num_levels, def_levels, rep_levels);
       WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
       WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
       WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
@@ -957,20 +894,130 @@
     default:
       break;
   }
-
-  PARQUET_CATCH_NOT_OK(column_writer->Close());
   std::stringstream ss;
   ss << "Data type not supported as list value: " << values_array->type()->ToString();
   return Status::NotImplemented(ss.str());
 }
 
-Status FileWriter::WriteColumnChunk(const ::arrow::Array& array) {
-  return impl_->WriteColumnChunk(array);
+}  // namespace
+
+// ----------------------------------------------------------------------
+// FileWriter implementation
+
+class FileWriter::Impl {
+ public:
+  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
+       const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
+      : writer_(std::move(writer)),
+        row_group_writer_(nullptr),
+        column_write_context_(pool, arrow_properties.get()),
+        arrow_properties_(arrow_properties),
+        closed_(false) {}
+
+  Status NewRowGroup(int64_t chunk_size) {
+    if (row_group_writer_ != nullptr) {
+      PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+    }
+    PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup());
+    return Status::OK();
+  }
+
+  Status Close() {
+    if (!closed_) {
+      // Make idempotent
+      closed_ = true;
+      if (row_group_writer_ != nullptr) {
+        PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+      }
+      PARQUET_CATCH_NOT_OK(writer_->Close());
+    }
+    return Status::OK();
+  }
+
+  Status WriteColumnChunk(const Array& data) {
+    // A bit awkward here since cannot instantiate ChunkedArray from const Array&
+    ::arrow::ArrayVector chunks = {::arrow::MakeArray(data.data())};
+    auto chunked_array = std::make_shared<::arrow::ChunkedArray>(chunks);
+    return WriteColumnChunk(chunked_array, 0, data.length());
+  }
+
+  Status WriteColumnChunk(const std::shared_ptr<ChunkedArray>& data, int64_t offset,
+                          const int64_t size) {
+    // DictionaryArrays are not yet handled with a fast path. To still support
+    // writing them as a workaround, we convert them back to their non-dictionary
+    // representation.
+    if (data->type()->id() == ::arrow::Type::DICTIONARY) {
+      const ::arrow::DictionaryType& dict_type =
+          static_cast<const ::arrow::DictionaryType&>(*data->type());
+
+      // TODO(ARROW-1648): Remove this special handling once we require an Arrow
+      // version that has this fixed.
+      if (dict_type.dictionary()->type()->id() == ::arrow::Type::NA) {
+        auto null_array = std::make_shared<::arrow::NullArray>(data->length());
+        return WriteColumnChunk(*null_array);
+      }
+
+      FunctionContext ctx(this->memory_pool());
+      ::arrow::compute::Datum cast_input(data);
+      ::arrow::compute::Datum cast_output;
+      RETURN_NOT_OK(Cast(&ctx, cast_input, dict_type.dictionary()->type(), CastOptions(),
+                         &cast_output));
+      return WriteColumnChunk(cast_output.chunked_array(), 0, data->length());
+    }
+
+    ColumnWriter* column_writer;
+    PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
+
+    // TODO(wesm): This trick to construct a schema for one Parquet root node
+    // will not work for arbitrary nested data
+    int current_column_idx = row_group_writer_->current_column();
+    std::shared_ptr<::arrow::Schema> arrow_schema;
+    RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1},
+                                    writer_->key_value_metadata(), &arrow_schema));
+
+    ArrowColumnWriter arrow_writer(&column_write_context_, column_writer,
+                                   arrow_schema->field(0));
+
+    RETURN_NOT_OK(arrow_writer.Write(*data, offset, size));
+    return arrow_writer.Close();
+  }
+
+  const WriterProperties& properties() const { return *writer_->properties(); }
+
+  ::arrow::MemoryPool* memory_pool() const { return column_write_context_.memory_pool; }
+
+  virtual ~Impl() {}
+
+ private:
+  friend class FileWriter;
+
+  std::unique_ptr<ParquetFileWriter> writer_;
+  RowGroupWriter* row_group_writer_;
+  ColumnWriterContext column_write_context_;
+  std::shared_ptr<ArrowWriterProperties> arrow_properties_;
+  bool closed_;
+};
+
+Status FileWriter::NewRowGroup(int64_t chunk_size) {
+  return impl_->NewRowGroup(chunk_size);
+}
+
+Status FileWriter::WriteColumnChunk(const ::arrow::Array& data) {
+  return impl_->WriteColumnChunk(data);
+}
+
+Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data,
+                                    const int64_t offset, const int64_t size) {
+  return impl_->WriteColumnChunk(data, offset, size);
+}
+
+Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data) {
+  return WriteColumnChunk(data, 0, data->length());
 }
 
 Status FileWriter::Close() { return impl_->Close(); }
 
-MemoryPool* FileWriter::memory_pool() const { return impl_->pool_; }
+MemoryPool* FileWriter::memory_pool() const { return impl_->memory_pool(); }
 
 FileWriter::~FileWriter() {}
 
@@ -1020,14 +1067,9 @@
   return Open(schema, pool, wrapper, properties, arrow_properties, writer);
 }
 
-Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
-  // TODO(ARROW-232) Support writing chunked arrays.
-  for (int i = 0; i < table.num_columns(); i++) {
-    if (table.column(i)->data()->num_chunks() != 1) {
-      return Status::NotImplemented("No support for writing chunked arrays yet.");
-    }
-  }
+namespace {}  // namespace
 
+Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
   if (chunk_size <= 0) {
     return Status::Invalid("chunk size per row_group must be greater than 0");
   } else if (chunk_size > impl_->properties().max_row_group_length()) {
@@ -1040,9 +1082,9 @@
 
     RETURN_NOT_OK_ELSE(NewRowGroup(size), PARQUET_IGNORE_NOT_OK(Close()));
     for (int i = 0; i < table.num_columns(); i++) {
-      std::shared_ptr<Array> array = table.column(i)->data()->chunk(0);
-      array = array->Slice(offset, size);
-      RETURN_NOT_OK_ELSE(WriteColumnChunk(*array), PARQUET_IGNORE_NOT_OK(Close()));
+      auto chunked_data = table.column(i)->data();
+      RETURN_NOT_OK_ELSE(WriteColumnChunk(chunked_data, offset, size),
+                         PARQUET_IGNORE_NOT_OK(Close()));
     }
   }
   return Status::OK();
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
index 24ba72d..a432850 100644
--- a/src/parquet/arrow/writer.h
+++ b/src/parquet/arrow/writer.h
@@ -133,15 +133,16 @@
       const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
       std::unique_ptr<FileWriter>* writer);
 
-  /**
-   * Write a Table to Parquet.
-   *
-   * The table shall only consist of columns of primitive type or of primitive lists.
-   */
+  /// \brief Write a Table to Parquet.
   ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size);
 
   ::arrow::Status NewRowGroup(int64_t chunk_size);
   ::arrow::Status WriteColumnChunk(const ::arrow::Array& data);
+
+  /// \brief Write ColumnChunk in row group using slice of a ChunkedArray
+  ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data,
+                                   const int64_t offset, const int64_t size);
+  ::arrow::Status WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data);
   ::arrow::Status Close();
 
   virtual ~FileWriter();
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 7b8c775..6b84748 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -117,6 +117,8 @@
 
   int64_t rows_written() const { return rows_written_; }
 
+  const WriterProperties* properties() { return properties_; }
+
  protected:
   virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
 
diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc
index 7b74812..72c71c6 100644
--- a/src/parquet/file_reader.cc
+++ b/src/parquet/file_reader.cc
@@ -64,9 +64,9 @@
     : contents_(std::move(contents)) {}
 
 std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
-  DCHECK(i < metadata()->num_columns())
-      << "The RowGroup only has " << metadata()->num_columns()
-      << "columns, requested column: " << i;
+  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+                                        << metadata()->num_columns()
+                                        << "columns, requested column: " << i;
   const ColumnDescriptor* descr = metadata()->schema()->Column(i);
 
   std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
@@ -76,9 +76,9 @@
 }
 
 std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
-  DCHECK(i < metadata()->num_columns())
-      << "The RowGroup only has " << metadata()->num_columns()
-      << "columns, requested column: " << i;
+  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+                                        << metadata()->num_columns()
+                                        << "columns, requested column: " << i;
   return contents_->GetColumnPageReader(i);
 }
 
@@ -302,9 +302,9 @@
 }
 
 std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
-  DCHECK(i < metadata()->num_row_groups())
-      << "The file only has " << metadata()->num_row_groups()
-      << "row groups, requested reader for: " << i;
+  DCHECK(i < metadata()->num_row_groups()) << "The file only has "
+                                           << metadata()->num_row_groups()
+                                           << "row groups, requested reader for: " << i;
   return contents_->GetRowGroup(i);
 }