PARQUET-859: Flatten parquet/file directory, consolidate file reader, file writer code

I believe this makes the codebase simpler and easier to navigate. By consolidating the file reader/writer code, further refactoring and internal improvements will be less obtuse

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #424 from wesm/PARQUET-859 and squashes the following commits:

c98e3d4 [Wes McKinney] Do not generate Thrift files in source directory
e987f47 [Wes McKinney] Remove superfluous PARQUET_EXPORT
0fb83a8 [Wes McKinney] Consolidate file_reader.h, file_reader-internal.h
ac6e2c5 [Wes McKinney] First cut flattening of parquet/file directory and consolidate writer, writer-internal headers
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 8a3558c..4774631 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -375,6 +375,7 @@
 # Dependencies
 ############################################################
 
+include_directories(${CMAKE_CURRENT_BINARY_DIR}/src)
 include_directories(
   ${CMAKE_CURRENT_SOURCE_DIR}/src
 )
@@ -661,7 +662,7 @@
 endif()
 
 # List of thrift output targets
-set(THRIFT_OUTPUT_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src/parquet)
+set(THRIFT_OUTPUT_DIR ${CMAKE_CURRENT_BINARY_DIR}/src/parquet)
 set(THRIFT_OUTPUT_FILES "${THRIFT_OUTPUT_DIR}/parquet_types.cpp")
 set(THRIFT_OUTPUT_FILES ${THRIFT_OUTPUT_FILES} "${THRIFT_OUTPUT_DIR}/parquet_types.h")
 set(THRIFT_OUTPUT_FILES ${THRIFT_OUTPUT_FILES} "${THRIFT_OUTPUT_DIR}/parquet_constants.cpp")
@@ -683,30 +684,23 @@
 # Library config
 
 set(LIBPARQUET_SRCS
-  src/parquet/exception.cc
-  src/parquet/types.cc
-
   src/parquet/arrow/reader.cc
   src/parquet/arrow/record_reader.cc
   src/parquet/arrow/schema.cc
   src/parquet/arrow/writer.cc
-
   src/parquet/column_reader.cc
   src/parquet/column_scanner.cc
   src/parquet/column_writer.cc
-
-  src/parquet/file/metadata.cc
-  src/parquet/file/printer.cc
-  src/parquet/file/reader.cc
-  src/parquet/file/reader-internal.cc
-  src/parquet/file/writer.cc
-  src/parquet/file/writer-internal.cc
-
-  src/parquet/schema.cc
-  src/parquet/statistics.cc
-
+  src/parquet/exception.cc
+  src/parquet/file_reader.cc
+  src/parquet/file_writer.cc
+  src/parquet/metadata.cc
   src/parquet/parquet_constants.cpp
   src/parquet/parquet_types.cpp
+  src/parquet/printer.cc
+  src/parquet/schema.cc
+  src/parquet/statistics.cc
+  src/parquet/types.cc
   src/parquet/util/comparison.cc
   src/parquet/util/memory.cc
 )
@@ -785,7 +779,6 @@
 add_subdirectory(src/parquet)
 add_subdirectory(src/parquet/api)
 add_subdirectory(src/parquet/arrow)
-add_subdirectory(src/parquet/file)
 add_subdirectory(src/parquet/util)
 
 if (NOT MSVC)
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index a2e283e..bc16d8b 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -23,6 +23,10 @@
   column_writer.h
   encoding.h
   exception.h
+  file_reader.h
+  file_writer.h
+  metadata.h
+  printer.h
   properties.h
   schema.h
   statistics.h
@@ -49,9 +53,12 @@
 ADD_PARQUET_TEST(column_reader-test)
 ADD_PARQUET_TEST(column_scanner-test)
 ADD_PARQUET_TEST(column_writer-test)
+ADD_PARQUET_TEST(file-deserialize-test)
+ADD_PARQUET_TEST(file-serialize-test)
 ADD_PARQUET_TEST(properties-test)
 ADD_PARQUET_TEST(statistics-test)
 ADD_PARQUET_TEST(encoding-test)
+ADD_PARQUET_TEST(metadata-test)
 ADD_PARQUET_TEST(public-api-test)
 ADD_PARQUET_TEST(types-test)
 ADD_PARQUET_TEST(reader-test)
diff --git a/src/parquet/api/reader.h b/src/parquet/api/reader.h
index ba9717a..505654f 100644
--- a/src/parquet/api/reader.h
+++ b/src/parquet/api/reader.h
@@ -22,11 +22,9 @@
 #include "parquet/column_reader.h"
 #include "parquet/column_scanner.h"
 #include "parquet/exception.h"
-#include "parquet/file/printer.h"
-#include "parquet/file/reader.h"
-
-// Metadata reader API
-#include "parquet/file/metadata.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+#include "parquet/printer.h"
 
 // Schemas
 #include "parquet/api/schema.h"
diff --git a/src/parquet/api/writer.h b/src/parquet/api/writer.h
index cc3ae2a..3b4e42f 100644
--- a/src/parquet/api/writer.h
+++ b/src/parquet/api/writer.h
@@ -18,15 +18,10 @@
 #ifndef PARQUET_API_WRITER_H
 #define PARQUET_API_WRITER_H
 
-// Column reader API
+#include "parquet/api/io.h"
+#include "parquet/api/schema.h"
 #include "parquet/column_writer.h"
 #include "parquet/exception.h"
-#include "parquet/file/writer.h"
-
-// Schemas
-#include "parquet/api/schema.h"
-
-// IO
-#include "parquet/api/io.h"
+#include "parquet/file_writer.h"
 
 #endif  // PARQUET_API_WRITER_H
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index edeef1e..15d2cf7 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -23,8 +23,8 @@
 #include "parquet/arrow/writer.h"
 #include "parquet/column_reader.h"
 #include "parquet/column_writer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/writer-internal.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
 #include "parquet/util/memory.h"
 
 #include "arrow/api.h"
@@ -142,7 +142,6 @@
 
 template <bool nullable, typename ParquetType>
 static void BM_WriteColumn(::benchmark::State& state) {
-  format::ColumnChunk thrift_metadata;
   std::vector<typename ParquetType::c_type> values(BENCHMARK_SIZE, 128);
   std::shared_ptr<::arrow::Table> table = TableFromVector<ParquetType>(values, nullable);
 
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index a8d3824..c10b164 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -34,7 +34,7 @@
 #include "parquet/arrow/test-util.h"
 #include "parquet/arrow/writer.h"
 
-#include "parquet/file/writer.h"
+#include "parquet/file_writer.h"
 
 #include "arrow/api.h"
 #include "arrow/test-util.h"
diff --git a/src/parquet/arrow/record_reader.h b/src/parquet/arrow/record_reader.h
index 8d55f9d..9ca8b68 100644
--- a/src/parquet/arrow/record_reader.h
+++ b/src/parquet/arrow/record_reader.h
@@ -30,7 +30,7 @@
 #include <arrow/memory_pool.h>
 #include <arrow/util/bit-util.h>
 
-#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
 #include "parquet/schema.h"
 #include "parquet/util/visibility.h"
 
diff --git a/src/parquet/column-io-benchmark.cc b/src/parquet/column-io-benchmark.cc
index ec7b52e..7c8d093 100644
--- a/src/parquet/column-io-benchmark.cc
+++ b/src/parquet/column-io-benchmark.cc
@@ -19,8 +19,8 @@
 
 #include "parquet/column_reader.h"
 #include "parquet/column_writer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/writer-internal.h"
+#include "parquet/file_reader.h"
+#include "parquet/parquet_types.h"
 #include "parquet/util/memory.h"
 
 namespace parquet {
@@ -33,8 +33,8 @@
                                          ColumnChunkMetaDataBuilder* metadata,
                                          ColumnDescriptor* schema,
                                          const WriterProperties* properties) {
-  std::unique_ptr<SerializedPageWriter> pager(
-      new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
+  std::unique_ptr<PageWriter> pager =
+      PageWriter::Open(dst, Compression::UNCOMPRESSED, metadata);
   return std::unique_ptr<Int64Writer>(
       new Int64Writer(metadata, std::move(pager), Encoding::PLAIN, properties));
 }
@@ -110,8 +110,8 @@
 std::unique_ptr<Int64Reader> BuildReader(std::shared_ptr<Buffer>& buffer,
                                          int64_t num_values, ColumnDescriptor* schema) {
   std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
-  std::unique_ptr<SerializedPageReader> page_reader(
-      new SerializedPageReader(std::move(source), num_values, Compression::UNCOMPRESSED));
+  std::unique_ptr<PageReader> page_reader =
+      PageReader::Open(std::move(source), num_values, Compression::UNCOMPRESSED);
   return std::unique_ptr<Int64Reader>(new Int64Reader(schema, std::move(page_reader)));
 }
 
diff --git a/src/parquet/column_page.h b/src/parquet/column_page.h
index 85e3bb5..c34eee7 100644
--- a/src/parquet/column_page.h
+++ b/src/parquet/column_page.h
@@ -168,35 +168,6 @@
   bool is_sorted_;
 };
 
-// Abstract page iterator interface. This way, we can feed column pages to the
-// ColumnReader through whatever mechanism we choose
-class PageReader {
- public:
-  virtual ~PageReader() {}
-
-  // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
-  // containing new Page otherwise
-  virtual std::shared_ptr<Page> NextPage() = 0;
-};
-
-class PageWriter {
- public:
-  virtual ~PageWriter() {}
-
-  // The Column Writer decides if dictionary encoding is used if set and
-  // if the dictionary encoding has fallen back to default encoding on reaching dictionary
-  // page limit
-  virtual void Close(bool has_dictionary, bool fallback) = 0;
-
-  virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;
-
-  virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
-
-  virtual bool has_compressor() = 0;
-
-  virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0;
-};
-
 }  // namespace parquet
 
 #endif  // PARQUET_COLUMN_PAGE_H
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
index d23e738..91557af 100644
--- a/src/parquet/column_reader.cc
+++ b/src/parquet/column_reader.cc
@@ -24,11 +24,14 @@
 #include <arrow/buffer.h>
 #include <arrow/memory_pool.h>
 #include <arrow/util/bit-util.h>
+#include <arrow/util/compression.h>
 #include <arrow/util/rle-encoding.h>
 
 #include "parquet/column_page.h"
 #include "parquet/encoding-internal.h"
+#include "parquet/parquet_types.h"
 #include "parquet/properties.h"
+#include "parquet/thrift.h"
 
 using arrow::MemoryPool;
 
@@ -89,6 +92,177 @@
   return default_reader_properties;
 }
 
+// ----------------------------------------------------------------------
+// SerializedPageReader deserializes Thrift metadata and pages that have been
+// assembled in a serialized stream for storing in a Parquet files
+
+// This subclass delimits pages appearing in a serialized stream, each preceded
+// by a serialized Thrift format::PageHeader indicating the type of each page
+// and the page metadata.
+class SerializedPageReader : public PageReader {
+ public:
+  SerializedPageReader(std::unique_ptr<InputStream> stream, int64_t total_num_rows,
+                       Compression::type codec, ::arrow::MemoryPool* pool)
+      : stream_(std::move(stream)),
+        decompression_buffer_(AllocateBuffer(pool, 0)),
+        seen_num_rows_(0),
+        total_num_rows_(total_num_rows) {
+    max_page_header_size_ = kDefaultMaxPageHeaderSize;
+    decompressor_ = GetCodecFromArrow(codec);
+  }
+
+  virtual ~SerializedPageReader() {}
+
+  // Implement the PageReader interface
+  std::shared_ptr<Page> NextPage() override;
+
+  void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }
+
+ private:
+  std::unique_ptr<InputStream> stream_;
+
+  format::PageHeader current_page_header_;
+  std::shared_ptr<Page> current_page_;
+
+  // Compression codec to use.
+  std::unique_ptr<::arrow::Codec> decompressor_;
+  std::shared_ptr<PoolBuffer> decompression_buffer_;
+
+  // Maximum allowed page size
+  uint32_t max_page_header_size_;
+
+  // Number of rows read in data pages so far
+  int64_t seen_num_rows_;
+
+  // Number of rows in all the data pages
+  int64_t total_num_rows_;
+};
+
+std::shared_ptr<Page> SerializedPageReader::NextPage() {
+  // Loop here because there may be unhandled page types that we skip until
+  // finding a page that we do know what to do with
+  while (seen_num_rows_ < total_num_rows_) {
+    int64_t bytes_read = 0;
+    int64_t bytes_available = 0;
+    uint32_t header_size = 0;
+    const uint8_t* buffer;
+    uint32_t allowed_page_size = kDefaultPageHeaderSize;
+
+    // Page headers can be very large because of page statistics
+    // We try to deserialize a larger buffer progressively
+    // until a maximum allowed header limit
+    while (true) {
+      buffer = stream_->Peek(allowed_page_size, &bytes_available);
+      if (bytes_available == 0) {
+        return std::shared_ptr<Page>(nullptr);
+      }
+
+      // This gets used, then set by DeserializeThriftMsg
+      header_size = static_cast<uint32_t>(bytes_available);
+      try {
+        DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
+        break;
+      } catch (std::exception& e) {
+        // Failed to deserialize. Double the allowed page header size and try again
+        std::stringstream ss;
+        ss << e.what();
+        allowed_page_size *= 2;
+        if (allowed_page_size > max_page_header_size_) {
+          ss << "Deserializing page header failed.\n";
+          throw ParquetException(ss.str());
+        }
+      }
+    }
+    // Advance the stream offset
+    stream_->Advance(header_size);
+
+    int compressed_len = current_page_header_.compressed_page_size;
+    int uncompressed_len = current_page_header_.uncompressed_page_size;
+
+    // Read the compressed data page.
+    buffer = stream_->Read(compressed_len, &bytes_read);
+    if (bytes_read != compressed_len) {
+      ParquetException::EofException();
+    }
+
+    // Uncompress it if we need to
+    if (decompressor_ != nullptr) {
+      // Grow the uncompressed buffer if we need to.
+      if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
+        PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));
+      }
+      PARQUET_THROW_NOT_OK(
+          decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
+                                    decompression_buffer_->mutable_data()));
+      buffer = decompression_buffer_->data();
+    }
+
+    auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len);
+
+    if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) {
+      const format::DictionaryPageHeader& dict_header =
+          current_page_header_.dictionary_page_header;
+
+      bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
+
+      return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values,
+                                              FromThrift(dict_header.encoding),
+                                              is_sorted);
+    } else if (current_page_header_.type == format::PageType::DATA_PAGE) {
+      const format::DataPageHeader& header = current_page_header_.data_page_header;
+
+      EncodedStatistics page_statistics;
+      if (header.__isset.statistics) {
+        const format::Statistics& stats = header.statistics;
+        if (stats.__isset.max) {
+          page_statistics.set_max(stats.max);
+        }
+        if (stats.__isset.min) {
+          page_statistics.set_min(stats.min);
+        }
+        if (stats.__isset.null_count) {
+          page_statistics.set_null_count(stats.null_count);
+        }
+        if (stats.__isset.distinct_count) {
+          page_statistics.set_distinct_count(stats.distinct_count);
+        }
+      }
+
+      seen_num_rows_ += header.num_values;
+
+      return std::make_shared<DataPage>(
+          page_buffer, header.num_values, FromThrift(header.encoding),
+          FromThrift(header.definition_level_encoding),
+          FromThrift(header.repetition_level_encoding), page_statistics);
+    } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) {
+      const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
+      bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false;
+
+      seen_num_rows_ += header.num_values;
+
+      return std::make_shared<DataPageV2>(
+          page_buffer, header.num_values, header.num_nulls, header.num_rows,
+          FromThrift(header.encoding), header.definition_levels_byte_length,
+          header.repetition_levels_byte_length, is_compressed);
+    } else {
+      // We don't know what this page type is. We're allowed to skip non-data
+      // pages.
+      continue;
+    }
+  }
+  return std::shared_ptr<Page>(nullptr);
+}
+
+std::unique_ptr<PageReader> PageReader::Open(std::unique_ptr<InputStream> stream,
+                                             int64_t total_num_rows,
+                                             Compression::type codec,
+                                             ::arrow::MemoryPool* pool) {
+  return std::unique_ptr<PageReader>(
+      new SerializedPageReader(std::move(stream), total_num_rows, codec, pool));
+}
+
+// ----------------------------------------------------------------------
+
 ColumnReader::ColumnReader(const ColumnDescriptor* descr,
                            std::unique_ptr<PageReader> pager, MemoryPool* pool)
     : descr_(descr),
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
index dcf41e8..6158cb3 100644
--- a/src/parquet/column_reader.h
+++ b/src/parquet/column_reader.h
@@ -49,6 +49,12 @@
 
 namespace parquet {
 
+// 16 MB is the default maximum page header size
+static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
+
+// 16 KB is the default expected page header size
+static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
+
 namespace BitUtil = ::arrow::BitUtil;
 
 class PARQUET_EXPORT LevelDecoder {
@@ -72,6 +78,24 @@
   std::unique_ptr<::arrow::BitReader> bit_packed_decoder_;
 };
 
+// Abstract page iterator interface. This way, we can feed column pages to the
+// ColumnReader through whatever mechanism we choose
+class PARQUET_EXPORT PageReader {
+ public:
+  virtual ~PageReader() = default;
+
+  static std::unique_ptr<PageReader> Open(
+      std::unique_ptr<InputStream> stream, int64_t total_num_rows,
+      Compression::type codec,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+  // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
+  // containing new Page otherwise
+  virtual std::shared_ptr<Page> NextPage() = 0;
+
+  virtual void set_max_page_header_size(uint32_t size) = 0;
+};
+
 class PARQUET_EXPORT ColumnReader {
  public:
   ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>,
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 681f022..b4e3232 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -19,8 +19,7 @@
 
 #include "parquet/column_reader.h"
 #include "parquet/column_writer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/writer-internal.h"
+#include "parquet/parquet_types.h"
 #include "parquet/test-specialization.h"
 #include "parquet/test-util.h"
 #include "parquet/types.h"
@@ -63,8 +62,8 @@
                    Compression::type compression = Compression::UNCOMPRESSED) {
     auto buffer = sink_->GetBuffer();
     std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
-    std::unique_ptr<SerializedPageReader> page_reader(
-        new SerializedPageReader(std::move(source), num_rows, compression));
+    std::unique_ptr<PageReader> page_reader =
+        PageReader::Open(std::move(source), num_rows, compression);
     reader_.reset(new TypedColumnReader<TestType>(this->descr_, std::move(page_reader)));
   }
 
@@ -74,8 +73,8 @@
     sink_.reset(new InMemoryOutputStream());
     metadata_ = ColumnChunkMetaDataBuilder::Make(
         writer_properties_, this->descr_, reinterpret_cast<uint8_t*>(&thrift_metadata_));
-    std::unique_ptr<SerializedPageWriter> pager(
-        new SerializedPageWriter(sink_.get(), column_properties.codec, metadata_.get()));
+    std::unique_ptr<PageWriter> pager =
+        PageWriter::Open(sink_.get(), column_properties.codec, metadata_.get());
     WriterProperties::Builder wp_builder;
     if (column_properties.encoding == Encoding::PLAIN_DICTIONARY ||
         column_properties.encoding == Encoding::RLE_DICTIONARY) {
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index 4aadf2b..bdaa9f6 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -17,12 +17,17 @@
 
 #include "parquet/column_writer.h"
 
+#include <cstdint>
+#include <memory>
+
 #include "arrow/util/bit-util.h"
+#include "arrow/util/compression.h"
 #include "arrow/util/rle-encoding.h"
 
 #include "parquet/encoding-internal.h"
 #include "parquet/properties.h"
 #include "parquet/statistics.h"
+#include "parquet/thrift.h"
 #include "parquet/util/logging.h"
 #include "parquet/util/memory.h"
 
@@ -104,6 +109,169 @@
 }
 
 // ----------------------------------------------------------------------
+// PageWriter implementation
+
+static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics) {
+  format::Statistics statistics;
+  if (row_group_statistics.has_min) statistics.__set_min(row_group_statistics.min());
+  if (row_group_statistics.has_max) statistics.__set_max(row_group_statistics.max());
+  if (row_group_statistics.has_null_count)
+    statistics.__set_null_count(row_group_statistics.null_count);
+  if (row_group_statistics.has_distinct_count)
+    statistics.__set_distinct_count(row_group_statistics.distinct_count);
+  return statistics;
+}
+
+// This subclass delimits pages appearing in a serialized stream, each preceded
+// by a serialized Thrift format::PageHeader indicating the type of each page
+// and the page metadata.
+class SerializedPageWriter : public PageWriter {
+ public:
+  SerializedPageWriter(OutputStream* sink, Compression::type codec,
+                       ColumnChunkMetaDataBuilder* metadata,
+                       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+      : sink_(sink),
+        metadata_(metadata),
+        pool_(pool),
+        num_values_(0),
+        dictionary_page_offset_(0),
+        data_page_offset_(0),
+        total_uncompressed_size_(0),
+        total_compressed_size_(0) {
+    compressor_ = GetCodecFromArrow(codec);
+  }
+
+  virtual ~SerializedPageWriter() = default;
+
+  int64_t WriteDictionaryPage(const DictionaryPage& page) override {
+    int64_t uncompressed_size = page.size();
+    std::shared_ptr<Buffer> compressed_data = nullptr;
+    if (has_compressor()) {
+      auto buffer = std::static_pointer_cast<ResizableBuffer>(
+          AllocateBuffer(pool_, uncompressed_size));
+      Compress(*(page.buffer().get()), buffer.get());
+      compressed_data = std::static_pointer_cast<Buffer>(buffer);
+    } else {
+      compressed_data = page.buffer();
+    }
+
+    format::DictionaryPageHeader dict_page_header;
+    dict_page_header.__set_num_values(page.num_values());
+    dict_page_header.__set_encoding(ToThrift(page.encoding()));
+    dict_page_header.__set_is_sorted(page.is_sorted());
+
+    format::PageHeader page_header;
+    page_header.__set_type(format::PageType::DICTIONARY_PAGE);
+    page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
+    page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
+    page_header.__set_dictionary_page_header(dict_page_header);
+    // TODO(PARQUET-594) crc checksum
+
+    int64_t start_pos = sink_->Tell();
+    if (dictionary_page_offset_ == 0) {
+      dictionary_page_offset_ = start_pos;
+    }
+    int64_t header_size =
+        SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
+    sink_->Write(compressed_data->data(), compressed_data->size());
+
+    total_uncompressed_size_ += uncompressed_size + header_size;
+    total_compressed_size_ += compressed_data->size() + header_size;
+
+    return sink_->Tell() - start_pos;
+  }
+
+  void Close(bool has_dictionary, bool fallback) override {
+    // index_page_offset = 0 since they are not supported
+    metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_,
+                      total_compressed_size_, total_uncompressed_size_, has_dictionary,
+                      fallback);
+
+    // Write metadata at end of column chunk
+    metadata_->WriteTo(sink_);
+  }
+
+  /**
+   * Compress a buffer.
+   */
+  void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
+    DCHECK(compressor_ != nullptr);
+
+    // Compress the data
+    int64_t max_compressed_size =
+        compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data());
+
+    // Use Arrow::Buffer::shrink_to_fit = false
+    // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
+    PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false));
+
+    int64_t compressed_size;
+    PARQUET_THROW_NOT_OK(
+        compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size,
+                              dest_buffer->mutable_data(), &compressed_size));
+    PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false));
+  }
+
+  int64_t WriteDataPage(const CompressedDataPage& page) override {
+    int64_t uncompressed_size = page.uncompressed_size();
+    std::shared_ptr<Buffer> compressed_data = page.buffer();
+
+    format::DataPageHeader data_page_header;
+    data_page_header.__set_num_values(page.num_values());
+    data_page_header.__set_encoding(ToThrift(page.encoding()));
+    data_page_header.__set_definition_level_encoding(
+        ToThrift(page.definition_level_encoding()));
+    data_page_header.__set_repetition_level_encoding(
+        ToThrift(page.repetition_level_encoding()));
+    data_page_header.__set_statistics(ToThrift(page.statistics()));
+
+    format::PageHeader page_header;
+    page_header.__set_type(format::PageType::DATA_PAGE);
+    page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
+    page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
+    page_header.__set_data_page_header(data_page_header);
+    // TODO(PARQUET-594) crc checksum
+
+    int64_t start_pos = sink_->Tell();
+    if (data_page_offset_ == 0) {
+      data_page_offset_ = start_pos;
+    }
+
+    int64_t header_size =
+        SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
+    sink_->Write(compressed_data->data(), compressed_data->size());
+
+    total_uncompressed_size_ += uncompressed_size + header_size;
+    total_compressed_size_ += compressed_data->size() + header_size;
+    num_values_ += page.num_values();
+
+    return sink_->Tell() - start_pos;
+  }
+
+  bool has_compressor() override { return (compressor_ != nullptr); }
+
+ private:
+  OutputStream* sink_;
+  ColumnChunkMetaDataBuilder* metadata_;
+  ::arrow::MemoryPool* pool_;
+  int64_t num_values_;
+  int64_t dictionary_page_offset_;
+  int64_t data_page_offset_;
+  int64_t total_uncompressed_size_;
+  int64_t total_compressed_size_;
+
+  // Compression codec to use.
+  std::unique_ptr<::arrow::Codec> compressor_;
+};
+
+std::unique_ptr<PageWriter> PageWriter::Open(OutputStream* sink, Compression::type codec,
+                                             ColumnChunkMetaDataBuilder* metadata,
+                                             ::arrow::MemoryPool* pool) {
+  return std::unique_ptr<PageWriter>(
+      new SerializedPageWriter(sink, codec, metadata, pool));
+}
+
+// ----------------------------------------------------------------------
 // ColumnWriter
 
 std::shared_ptr<WriterProperties> default_writer_properties() {
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 0408747..f1c13a0 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -22,7 +22,7 @@
 
 #include "parquet/column_page.h"
 #include "parquet/encoding.h"
-#include "parquet/file/metadata.h"
+#include "parquet/metadata.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
 #include "parquet/statistics.h"
@@ -69,6 +69,28 @@
   std::unique_ptr<::arrow::BitWriter> bit_packed_encoder_;
 };
 
+class PageWriter {
+ public:
+  virtual ~PageWriter() {}
+
+  static std::unique_ptr<PageWriter> Open(
+      OutputStream* sink, Compression::type codec, ColumnChunkMetaDataBuilder* metadata,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+  // The Column Writer decides if dictionary encoding is used if set and
+  // if the dictionary encoding has fallen back to default encoding on reaching dictionary
+  // page limit
+  virtual void Close(bool has_dictionary, bool fallback) = 0;
+
+  virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;
+
+  virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
+
+  virtual bool has_compressor() = 0;
+
+  virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0;
+};
+
 static constexpr int WRITE_BATCH_SIZE = 1000;
 class PARQUET_EXPORT ColumnWriter {
  public:
diff --git a/src/parquet/encoding-benchmark.cc b/src/parquet/encoding-benchmark.cc
index 97eeefa..72c41e5 100644
--- a/src/parquet/encoding-benchmark.cc
+++ b/src/parquet/encoding-benchmark.cc
@@ -18,7 +18,6 @@
 #include "benchmark/benchmark.h"
 
 #include "parquet/encoding-internal.h"
-#include "parquet/file/reader-internal.h"
 #include "parquet/util/memory.h"
 
 using arrow::default_memory_pool;
@@ -26,7 +25,6 @@
 
 namespace parquet {
 
-using format::ColumnChunk;
 using schema::PrimitiveNode;
 
 namespace benchmark {
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file-deserialize-test.cc
similarity index 96%
rename from src/parquet/file/file-deserialize-test.cc
rename to src/parquet/file-deserialize-test.cc
index 0cab75f..5e17375 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file-deserialize-test.cc
@@ -26,9 +26,9 @@
 #include <string>
 #include <vector>
 
-#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
 #include "parquet/exception.h"
-#include "parquet/file/reader-internal.h"
+#include "parquet/file_reader.h"
 #include "parquet/parquet_types.h"
 #include "parquet/thrift.h"
 #include "parquet/types.h"
@@ -73,7 +73,7 @@
     EndStream();
     std::unique_ptr<InputStream> stream;
     stream.reset(new InMemoryInputStream(out_buffer_));
-    page_reader_.reset(new SerializedPageReader(std::move(stream), num_rows, codec));
+    page_reader_ = PageReader::Open(std::move(stream), num_rows, codec);
   }
 
   void WriteDataPageHeader(int max_serialized_len = 1024, int32_t uncompressed_size = 0,
@@ -99,7 +99,7 @@
   std::unique_ptr<InMemoryOutputStream> out_stream_;
   std::shared_ptr<Buffer> out_buffer_;
 
-  std::unique_ptr<SerializedPageReader> page_reader_;
+  std::unique_ptr<PageReader> page_reader_;
   format::PageHeader page_header_;
   format::DataPageHeader data_page_header_;
 };
@@ -149,7 +149,7 @@
 
   // check header size is between 256 KB to 16 MB
   ASSERT_LE(stats_size, out_stream_->Tell());
-  ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, out_stream_->Tell());
+  ASSERT_GE(kDefaultMaxPageHeaderSize, out_stream_->Tell());
 
   InitSerializedPageReader(num_rows);
   std::shared_ptr<Page> current_page = page_reader_->NextPage();
@@ -249,7 +249,7 @@
     auto reader = std::make_shared<BufferReader>(buffer);
     auto wrapper = std::unique_ptr<ArrowInputFile>(new ArrowInputFile(reader));
 
-    ASSERT_THROW(reader_->Open(SerializedFile::Open(std::move(wrapper))),
+    ASSERT_THROW(reader_->Open(ParquetFileReader::Contents::Open(std::move(wrapper))),
                  ParquetException);
   }
 
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file-serialize-test.cc
similarity index 98%
rename from src/parquet/file/file-serialize-test.cc
rename to src/parquet/file-serialize-test.cc
index 4d94d2e..b4df77e 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file-serialize-test.cc
@@ -19,8 +19,8 @@
 
 #include "parquet/column_reader.h"
 #include "parquet/column_writer.h"
-#include "parquet/file/reader.h"
-#include "parquet/file/writer.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
 #include "parquet/test-specialization.h"
 #include "parquet/test-util.h"
 #include "parquet/types.h"
diff --git a/src/parquet/file/CMakeLists.txt b/src/parquet/file/CMakeLists.txt
deleted file mode 100644
index 82e7c80..0000000
--- a/src/parquet/file/CMakeLists.txt
+++ /dev/null
@@ -1,27 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-install(FILES
-  metadata.h
-  printer.h
-  reader.h
-  writer.h
-  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/parquet/file")
-
-ADD_PARQUET_TEST(file-deserialize-test)
-ADD_PARQUET_TEST(file-metadata-test)
-ADD_PARQUET_TEST(file-serialize-test)
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
deleted file mode 100644
index bc14ec9..0000000
--- a/src/parquet/file/reader-internal.cc
+++ /dev/null
@@ -1,309 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/file/reader-internal.h"
-
-#include <string.h>
-#include <algorithm>
-#include <exception>
-#include <ostream>
-#include <string>
-#include <vector>
-
-#include "arrow/util/compression.h"
-
-#include "parquet/column_page.h"
-#include "parquet/exception.h"
-#include "parquet/schema.h"
-#include "parquet/thrift.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-
-using arrow::MemoryPool;
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// SerializedPageReader deserializes Thrift metadata and pages that have been
-// assembled in a serialized stream for storing in a Parquet files
-
-SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
-                                           int64_t total_num_rows,
-                                           Compression::type codec, MemoryPool* pool)
-    : stream_(std::move(stream)),
-      decompression_buffer_(AllocateBuffer(pool, 0)),
-      seen_num_rows_(0),
-      total_num_rows_(total_num_rows) {
-  max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
-  decompressor_ = GetCodecFromArrow(codec);
-}
-
-std::shared_ptr<Page> SerializedPageReader::NextPage() {
-  // Loop here because there may be unhandled page types that we skip until
-  // finding a page that we do know what to do with
-  while (seen_num_rows_ < total_num_rows_) {
-    int64_t bytes_read = 0;
-    int64_t bytes_available = 0;
-    uint32_t header_size = 0;
-    const uint8_t* buffer;
-    uint32_t allowed_page_size = DEFAULT_PAGE_HEADER_SIZE;
-
-    // Page headers can be very large because of page statistics
-    // We try to deserialize a larger buffer progressively
-    // until a maximum allowed header limit
-    while (true) {
-      buffer = stream_->Peek(allowed_page_size, &bytes_available);
-      if (bytes_available == 0) {
-        return std::shared_ptr<Page>(nullptr);
-      }
-
-      // This gets used, then set by DeserializeThriftMsg
-      header_size = static_cast<uint32_t>(bytes_available);
-      try {
-        DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
-        break;
-      } catch (std::exception& e) {
-        // Failed to deserialize. Double the allowed page header size and try again
-        std::stringstream ss;
-        ss << e.what();
-        allowed_page_size *= 2;
-        if (allowed_page_size > max_page_header_size_) {
-          ss << "Deserializing page header failed.\n";
-          throw ParquetException(ss.str());
-        }
-      }
-    }
-    // Advance the stream offset
-    stream_->Advance(header_size);
-
-    int compressed_len = current_page_header_.compressed_page_size;
-    int uncompressed_len = current_page_header_.uncompressed_page_size;
-
-    // Read the compressed data page.
-    buffer = stream_->Read(compressed_len, &bytes_read);
-    if (bytes_read != compressed_len) {
-      ParquetException::EofException();
-    }
-
-    // Uncompress it if we need to
-    if (decompressor_ != nullptr) {
-      // Grow the uncompressed buffer if we need to.
-      if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
-        PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));
-      }
-      PARQUET_THROW_NOT_OK(
-          decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
-                                    decompression_buffer_->mutable_data()));
-      buffer = decompression_buffer_->data();
-    }
-
-    auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len);
-
-    if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) {
-      const format::DictionaryPageHeader& dict_header =
-          current_page_header_.dictionary_page_header;
-
-      bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false;
-
-      return std::make_shared<DictionaryPage>(page_buffer, dict_header.num_values,
-                                              FromThrift(dict_header.encoding),
-                                              is_sorted);
-    } else if (current_page_header_.type == format::PageType::DATA_PAGE) {
-      const format::DataPageHeader& header = current_page_header_.data_page_header;
-
-      EncodedStatistics page_statistics;
-      if (header.__isset.statistics) {
-        const format::Statistics& stats = header.statistics;
-        if (stats.__isset.max) {
-          page_statistics.set_max(stats.max);
-        }
-        if (stats.__isset.min) {
-          page_statistics.set_min(stats.min);
-        }
-        if (stats.__isset.null_count) {
-          page_statistics.set_null_count(stats.null_count);
-        }
-        if (stats.__isset.distinct_count) {
-          page_statistics.set_distinct_count(stats.distinct_count);
-        }
-      }
-
-      seen_num_rows_ += header.num_values;
-
-      return std::make_shared<DataPage>(
-          page_buffer, header.num_values, FromThrift(header.encoding),
-          FromThrift(header.definition_level_encoding),
-          FromThrift(header.repetition_level_encoding), page_statistics);
-    } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) {
-      const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
-      bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false;
-
-      seen_num_rows_ += header.num_values;
-
-      return std::make_shared<DataPageV2>(
-          page_buffer, header.num_values, header.num_nulls, header.num_rows,
-          FromThrift(header.encoding), header.definition_levels_byte_length,
-          header.repetition_levels_byte_length, is_compressed);
-    } else {
-      // We don't know what this page type is. We're allowed to skip non-data
-      // pages.
-      continue;
-    }
-  }
-  return std::shared_ptr<Page>(nullptr);
-}
-
-SerializedRowGroup::SerializedRowGroup(RandomAccessSource* source,
-                                       FileMetaData* file_metadata, int row_group_number,
-                                       const ReaderProperties& props)
-    : source_(source), file_metadata_(file_metadata), properties_(props) {
-  row_group_metadata_ = file_metadata->RowGroup(row_group_number);
-}
-const RowGroupMetaData* SerializedRowGroup::metadata() const {
-  return row_group_metadata_.get();
-}
-
-const ReaderProperties* SerializedRowGroup::properties() const { return &properties_; }
-
-// For PARQUET-816
-static constexpr int64_t kMaxDictHeaderSize = 100;
-
-std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) {
-  // Read column chunk from the file
-  auto col = row_group_metadata_->ColumnChunk(i);
-
-  int64_t col_start = col->data_page_offset();
-  if (col->has_dictionary_page() && col_start > col->dictionary_page_offset()) {
-    col_start = col->dictionary_page_offset();
-  }
-
-  int64_t col_length = col->total_compressed_size();
-  std::unique_ptr<InputStream> stream;
-
-  // PARQUET-816 workaround for old files created by older parquet-mr
-  const ApplicationVersion& version = file_metadata_->writer_version();
-  if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION)) {
-    // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
-    // dictionary page header size in total_compressed_size and total_uncompressed_size
-    // (see IMPALA-694). We add padding to compensate.
-    int64_t bytes_remaining = source_->Size() - (col_start + col_length);
-    int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining);
-    col_length += padding;
-  }
-
-  stream = properties_.GetStream(source_, col_start, col_length);
-
-  return std::unique_ptr<PageReader>(
-      new SerializedPageReader(std::move(stream), col->num_values(), col->compression(),
-                               properties_.memory_pool()));
-}
-
-// ----------------------------------------------------------------------
-// SerializedFile: Parquet on-disk layout
-
-// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
-static constexpr int64_t DEFAULT_FOOTER_READ_SIZE = 64 * 1024;
-static constexpr uint32_t FOOTER_SIZE = 8;
-static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
-
-std::unique_ptr<ParquetFileReader::Contents> SerializedFile::Open(
-    std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
-    const std::shared_ptr<FileMetaData>& metadata) {
-  std::unique_ptr<ParquetFileReader::Contents> result(
-      new SerializedFile(std::move(source), props));
-
-  // Access private methods here, but otherwise unavailable
-  SerializedFile* file = static_cast<SerializedFile*>(result.get());
-
-  if (metadata == nullptr) {
-    // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor
-    file->ParseMetaData();
-  } else {
-    file->file_metadata_ = metadata;
-  }
-
-  return result;
-}
-
-void SerializedFile::Close() { source_->Close(); }
-
-SerializedFile::~SerializedFile() {
-  try {
-    Close();
-  } catch (...) {
-  }
-}
-
-std::shared_ptr<RowGroupReader> SerializedFile::GetRowGroup(int i) {
-  std::unique_ptr<SerializedRowGroup> contents(
-      new SerializedRowGroup(source_.get(), file_metadata_.get(), i, properties_));
-  return std::make_shared<RowGroupReader>(std::move(contents));
-}
-
-std::shared_ptr<FileMetaData> SerializedFile::metadata() const { return file_metadata_; }
-
-SerializedFile::SerializedFile(
-    std::unique_ptr<RandomAccessSource> source,
-    const ReaderProperties& props = default_reader_properties())
-    : source_(std::move(source)), properties_(props) {}
-
-void SerializedFile::ParseMetaData() {
-  int64_t file_size = source_->Size();
-
-  if (file_size < FOOTER_SIZE) {
-    throw ParquetException("Corrupted file, smaller than file footer");
-  }
-
-  uint8_t footer_buffer[DEFAULT_FOOTER_READ_SIZE];
-  int64_t footer_read_size = std::min(file_size, DEFAULT_FOOTER_READ_SIZE);
-  int64_t bytes_read =
-      source_->ReadAt(file_size - footer_read_size, footer_read_size, footer_buffer);
-
-  // Check if all bytes are read. Check if last 4 bytes read have the magic bits
-  if (bytes_read != footer_read_size ||
-      memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) != 0) {
-    throw ParquetException("Invalid parquet file. Corrupt footer.");
-  }
-
-  uint32_t metadata_len =
-      *reinterpret_cast<uint32_t*>(footer_buffer + footer_read_size - FOOTER_SIZE);
-  int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len;
-  if (FOOTER_SIZE + metadata_len > file_size) {
-    throw ParquetException(
-        "Invalid parquet file. File is less than "
-        "file metadata size.");
-  }
-
-  std::shared_ptr<PoolBuffer> metadata_buffer =
-      AllocateBuffer(properties_.memory_pool(), metadata_len);
-
-  // Check if the footer_buffer contains the entire metadata
-  if (footer_read_size >= (metadata_len + FOOTER_SIZE)) {
-    memcpy(metadata_buffer->mutable_data(),
-           footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE), metadata_len);
-  } else {
-    bytes_read =
-        source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data());
-    if (bytes_read != metadata_len) {
-      throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
-    }
-  }
-
-  file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len);
-}
-
-}  // namespace parquet
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
deleted file mode 100644
index 282c534..0000000
--- a/src/parquet/file/reader-internal.h
+++ /dev/null
@@ -1,133 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_FILE_READER_INTERNAL_H
-#define PARQUET_FILE_READER_INTERNAL_H
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-#include "parquet/column_page.h"
-#include "parquet/file/metadata.h"
-#include "parquet/file/reader.h"
-#include "parquet/parquet_types.h"
-#include "parquet/properties.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/visibility.h"
-
-namespace arrow {
-
-class Codec;
-};
-
-namespace parquet {
-
-// 16 MB is the default maximum page header size
-static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024;
-
-// 16 KB is the default expected page header size
-static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024;
-
-// This subclass delimits pages appearing in a serialized stream, each preceded
-// by a serialized Thrift format::PageHeader indicating the type of each page
-// and the page metadata.
-class PARQUET_EXPORT SerializedPageReader : public PageReader {
- public:
-  SerializedPageReader(std::unique_ptr<InputStream> stream, int64_t num_rows,
-                       Compression::type codec,
-                       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
-
-  virtual ~SerializedPageReader() {}
-
-  // Implement the PageReader interface
-  virtual std::shared_ptr<Page> NextPage();
-
-  void set_max_page_header_size(uint32_t size) { max_page_header_size_ = size; }
-
- private:
-  std::unique_ptr<InputStream> stream_;
-
-  format::PageHeader current_page_header_;
-  std::shared_ptr<Page> current_page_;
-
-  // Compression codec to use.
-  std::unique_ptr<::arrow::Codec> decompressor_;
-  std::shared_ptr<PoolBuffer> decompression_buffer_;
-
-  // Maximum allowed page size
-  uint32_t max_page_header_size_;
-
-  // Number of rows read in data pages so far
-  int64_t seen_num_rows_;
-
-  // Number of rows in all the data pages
-  int64_t total_num_rows_;
-};
-
-// RowGroupReader::Contents implementation for the Parquet file specification
-class PARQUET_EXPORT SerializedRowGroup : public RowGroupReader::Contents {
- public:
-  SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata,
-                     int row_group_number, const ReaderProperties& props);
-
-  virtual const RowGroupMetaData* metadata() const;
-
-  virtual const ReaderProperties* properties() const;
-
-  virtual std::unique_ptr<PageReader> GetColumnPageReader(int i);
-
- private:
-  RandomAccessSource* source_;
-  FileMetaData* file_metadata_;
-  std::unique_ptr<RowGroupMetaData> row_group_metadata_;
-  ReaderProperties properties_;
-};
-
-// An implementation of ParquetFileReader::Contents that deals with the Parquet
-// file structure, Thrift deserialization, and other internal matters
-
-class PARQUET_EXPORT SerializedFile : public ParquetFileReader::Contents {
- public:
-  // Open the file. If no metadata is passed, it is parsed from the footer of
-  // the file
-  static std::unique_ptr<ParquetFileReader::Contents> Open(
-      std::unique_ptr<RandomAccessSource> source,
-      const ReaderProperties& props = default_reader_properties(),
-      const std::shared_ptr<FileMetaData>& metadata = nullptr);
-
-  void Close() override;
-  std::shared_ptr<RowGroupReader> GetRowGroup(int i) override;
-  std::shared_ptr<FileMetaData> metadata() const override;
-  virtual ~SerializedFile();
-
- private:
-  // This class takes ownership of the provided data source
-  explicit SerializedFile(std::unique_ptr<RandomAccessSource> source,
-                          const ReaderProperties& props);
-
-  std::unique_ptr<RandomAccessSource> source_;
-  std::shared_ptr<FileMetaData> file_metadata_;
-  ReaderProperties properties_;
-
-  void ParseMetaData();
-};
-
-}  // namespace parquet
-
-#endif  // PARQUET_FILE_READER_INTERNAL_H
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
deleted file mode 100644
index 9b9bde9..0000000
--- a/src/parquet/file/reader.cc
+++ /dev/null
@@ -1,192 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/file/reader.h"
-
-#include <cstdio>
-#include <memory>
-#include <sstream>
-#include <string>
-#include <utility>
-
-#include "arrow/io/file.h"
-
-#include "parquet/column_page.h"
-#include "parquet/column_reader.h"
-#include "parquet/column_scanner.h"
-#include "parquet/exception.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/types.h"
-#include "parquet/util/logging.h"
-#include "parquet/util/memory.h"
-
-using std::string;
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// RowGroupReader public API
-
-RowGroupReader::RowGroupReader(std::unique_ptr<Contents> contents)
-    : contents_(std::move(contents)) {}
-
-std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
-  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
-                                        << metadata()->num_columns()
-                                        << "columns, requested column: " << i;
-  const ColumnDescriptor* descr = metadata()->schema()->Column(i);
-
-  std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
-  return ColumnReader::Make(
-      descr, std::move(page_reader),
-      const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
-}
-
-std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
-  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
-                                        << metadata()->num_columns()
-                                        << "columns, requested column: " << i;
-  return contents_->GetColumnPageReader(i);
-}
-
-// Returns the rowgroup metadata
-const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); }
-
-// ----------------------------------------------------------------------
-// ParquetFileReader public API
-
-ParquetFileReader::ParquetFileReader() {}
-ParquetFileReader::~ParquetFileReader() {
-  try {
-    Close();
-  } catch (...) {
-  }
-}
-
-std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
-    const std::shared_ptr<::arrow::io::ReadableFileInterface>& source,
-    const ReaderProperties& props, const std::shared_ptr<FileMetaData>& metadata) {
-  std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(source));
-  return Open(std::move(io_wrapper), props, metadata);
-}
-
-std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
-    std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
-    const std::shared_ptr<FileMetaData>& metadata) {
-  auto contents = SerializedFile::Open(std::move(source), props, metadata);
-  std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
-  result->Open(std::move(contents));
-  return result;
-}
-
-std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
-    const std::string& path, bool memory_map, const ReaderProperties& props,
-    const std::shared_ptr<FileMetaData>& metadata) {
-  std::shared_ptr<::arrow::io::ReadableFileInterface> source;
-  if (memory_map) {
-    std::shared_ptr<::arrow::io::ReadableFile> handle;
-    PARQUET_THROW_NOT_OK(
-        ::arrow::io::ReadableFile::Open(path, props.memory_pool(), &handle));
-    source = handle;
-  } else {
-    std::shared_ptr<::arrow::io::MemoryMappedFile> handle;
-    PARQUET_THROW_NOT_OK(
-        ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ, &handle));
-    source = handle;
-  }
-
-  return Open(source, props, metadata);
-}
-
-void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) {
-  contents_ = std::move(contents);
-}
-
-void ParquetFileReader::Close() {
-  if (contents_) {
-    contents_->Close();
-  }
-}
-
-std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const {
-  return contents_->metadata();
-}
-
-std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
-  DCHECK(i < metadata()->num_row_groups()) << "The file only has "
-                                           << metadata()->num_row_groups()
-                                           << "row groups, requested reader for: " << i;
-  return contents_->GetRowGroup(i);
-}
-
-// ----------------------------------------------------------------------
-// File metadata helpers
-
-std::shared_ptr<FileMetaData> ReadMetaData(
-    const std::shared_ptr<::arrow::io::ReadableFileInterface>& source) {
-  return ParquetFileReader::Open(source)->metadata();
-}
-
-// ----------------------------------------------------------------------
-// File scanner for performance testing
-
-int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size,
-                         ParquetFileReader* reader) {
-  std::vector<int16_t> rep_levels(column_batch_size);
-  std::vector<int16_t> def_levels(column_batch_size);
-
-  int num_columns = static_cast<int>(columns.size());
-
-  // columns are not specified explicitly. Add all columns
-  if (columns.size() == 0) {
-    num_columns = reader->metadata()->num_columns();
-    columns.resize(num_columns);
-    for (int i = 0; i < num_columns; i++) {
-      columns[i] = i;
-    }
-  }
-
-  std::vector<int64_t> total_rows(num_columns, 0);
-
-  for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) {
-    auto group_reader = reader->RowGroup(r);
-    int col = 0;
-    for (auto i : columns) {
-      std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
-      size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type());
-      std::vector<uint8_t> values(column_batch_size * value_byte_size);
-
-      int64_t values_read = 0;
-      while (col_reader->HasNext()) {
-        total_rows[col] +=
-            ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(),
-                          values.data(), &values_read, col_reader.get());
-      }
-      col++;
-    }
-  }
-
-  for (int i = 1; i < num_columns; ++i) {
-    if (total_rows[0] != total_rows[i]) {
-      throw ParquetException("Parquet error: Total rows among columns do not match");
-    }
-  }
-
-  return total_rows[0];
-}
-
-}  // namespace parquet
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
deleted file mode 100644
index 712289b..0000000
--- a/src/parquet/file/writer-internal.cc
+++ /dev/null
@@ -1,327 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/file/writer-internal.h"
-
-#include <cstdint>
-#include <memory>
-
-#include "arrow/util/compression.h"
-
-#include "parquet/column_writer.h"
-#include "parquet/schema-internal.h"
-#include "parquet/schema.h"
-#include "parquet/thrift.h"
-#include "parquet/util/memory.h"
-
-using arrow::MemoryPool;
-
-using parquet::schema::GroupNode;
-using parquet::schema::SchemaFlattener;
-
-namespace parquet {
-
-// FIXME: copied from reader-internal.cc
-static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
-
-// ----------------------------------------------------------------------
-// SerializedPageWriter
-
-SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type codec,
-                                           ColumnChunkMetaDataBuilder* metadata,
-                                           MemoryPool* pool)
-    : sink_(sink),
-      metadata_(metadata),
-      pool_(pool),
-      num_values_(0),
-      dictionary_page_offset_(0),
-      data_page_offset_(0),
-      total_uncompressed_size_(0),
-      total_compressed_size_(0) {
-  compressor_ = GetCodecFromArrow(codec);
-}
-
-static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics) {
-  format::Statistics statistics;
-  if (row_group_statistics.has_min) statistics.__set_min(row_group_statistics.min());
-  if (row_group_statistics.has_max) statistics.__set_max(row_group_statistics.max());
-  if (row_group_statistics.has_null_count)
-    statistics.__set_null_count(row_group_statistics.null_count);
-  if (row_group_statistics.has_distinct_count)
-    statistics.__set_distinct_count(row_group_statistics.distinct_count);
-  return statistics;
-}
-
-void SerializedPageWriter::Close(bool has_dictionary, bool fallback) {
-  // index_page_offset = 0 since they are not supported
-  metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_,
-                    total_compressed_size_, total_uncompressed_size_, has_dictionary,
-                    fallback);
-
-  // Write metadata at end of column chunk
-  metadata_->WriteTo(sink_);
-}
-
-void SerializedPageWriter::Compress(const Buffer& src_buffer,
-                                    ResizableBuffer* dest_buffer) {
-  DCHECK(compressor_ != nullptr);
-
-  // Compress the data
-  int64_t max_compressed_size =
-      compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data());
-
-  // Use Arrow::Buffer::shrink_to_fit = false
-  // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
-  PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false));
-
-  int64_t compressed_size;
-  PARQUET_THROW_NOT_OK(
-      compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size,
-                            dest_buffer->mutable_data(), &compressed_size));
-  PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false));
-}
-
-int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) {
-  int64_t uncompressed_size = page.uncompressed_size();
-  std::shared_ptr<Buffer> compressed_data = page.buffer();
-
-  format::DataPageHeader data_page_header;
-  data_page_header.__set_num_values(page.num_values());
-  data_page_header.__set_encoding(ToThrift(page.encoding()));
-  data_page_header.__set_definition_level_encoding(
-      ToThrift(page.definition_level_encoding()));
-  data_page_header.__set_repetition_level_encoding(
-      ToThrift(page.repetition_level_encoding()));
-  data_page_header.__set_statistics(ToThrift(page.statistics()));
-
-  format::PageHeader page_header;
-  page_header.__set_type(format::PageType::DATA_PAGE);
-  page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
-  page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
-  page_header.__set_data_page_header(data_page_header);
-  // TODO(PARQUET-594) crc checksum
-
-  int64_t start_pos = sink_->Tell();
-  if (data_page_offset_ == 0) {
-    data_page_offset_ = start_pos;
-  }
-
-  int64_t header_size =
-      SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
-  sink_->Write(compressed_data->data(), compressed_data->size());
-
-  total_uncompressed_size_ += uncompressed_size + header_size;
-  total_compressed_size_ += compressed_data->size() + header_size;
-  num_values_ += page.num_values();
-
-  return sink_->Tell() - start_pos;
-}
-
-int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) {
-  int64_t uncompressed_size = page.size();
-  std::shared_ptr<Buffer> compressed_data = nullptr;
-  if (has_compressor()) {
-    auto buffer = std::static_pointer_cast<ResizableBuffer>(
-        AllocateBuffer(pool_, uncompressed_size));
-    Compress(*(page.buffer().get()), buffer.get());
-    compressed_data = std::static_pointer_cast<Buffer>(buffer);
-  } else {
-    compressed_data = page.buffer();
-  }
-
-  format::DictionaryPageHeader dict_page_header;
-  dict_page_header.__set_num_values(page.num_values());
-  dict_page_header.__set_encoding(ToThrift(page.encoding()));
-  dict_page_header.__set_is_sorted(page.is_sorted());
-
-  format::PageHeader page_header;
-  page_header.__set_type(format::PageType::DICTIONARY_PAGE);
-  page_header.__set_uncompressed_page_size(static_cast<int32_t>(uncompressed_size));
-  page_header.__set_compressed_page_size(static_cast<int32_t>(compressed_data->size()));
-  page_header.__set_dictionary_page_header(dict_page_header);
-  // TODO(PARQUET-594) crc checksum
-
-  int64_t start_pos = sink_->Tell();
-  if (dictionary_page_offset_ == 0) {
-    dictionary_page_offset_ = start_pos;
-  }
-  int64_t header_size =
-      SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
-  sink_->Write(compressed_data->data(), compressed_data->size());
-
-  total_uncompressed_size_ += uncompressed_size + header_size;
-  total_compressed_size_ += compressed_data->size() + header_size;
-
-  return sink_->Tell() - start_pos;
-}
-
-// ----------------------------------------------------------------------
-// RowGroupSerializer
-
-int RowGroupSerializer::num_columns() const { return metadata_->num_columns(); }
-
-int64_t RowGroupSerializer::num_rows() const {
-  if (current_column_writer_) {
-    CheckRowsWritten();
-  }
-  return num_rows_ < 0 ? 0 : num_rows_;
-}
-
-void RowGroupSerializer::CheckRowsWritten() const {
-  int64_t current_rows = current_column_writer_->rows_written();
-  if (num_rows_ < 0) {
-    num_rows_ = current_rows;
-    metadata_->set_num_rows(current_rows);
-  } else if (num_rows_ != current_rows) {
-    std::stringstream ss;
-    ss << "Column " << current_column_index_ << " had " << current_rows
-       << " while previous column had " << num_rows_;
-    throw ParquetException(ss.str());
-  }
-}
-
-ColumnWriter* RowGroupSerializer::NextColumn() {
-  if (current_column_writer_) {
-    CheckRowsWritten();
-  }
-
-  // Throws an error if more columns are being written
-  auto col_meta = metadata_->NextColumnChunk();
-
-  if (current_column_writer_) {
-    total_bytes_written_ += current_column_writer_->Close();
-  }
-
-  ++current_column_index_;
-
-  const ColumnDescriptor* column_descr = col_meta->descr();
-  std::unique_ptr<PageWriter> pager(
-      new SerializedPageWriter(sink_, properties_->compression(column_descr->path()),
-                               col_meta, properties_->memory_pool()));
-  current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), properties_);
-  return current_column_writer_.get();
-}
-
-int RowGroupSerializer::current_column() const { return metadata_->current_column(); }
-
-void RowGroupSerializer::Close() {
-  if (!closed_) {
-    closed_ = true;
-
-    if (current_column_writer_) {
-      CheckRowsWritten();
-      total_bytes_written_ += current_column_writer_->Close();
-      current_column_writer_.reset();
-    }
-
-    // Ensures all columns have been written
-    metadata_->Finish(total_bytes_written_);
-  }
-}
-
-// ----------------------------------------------------------------------
-// FileSerializer
-
-std::unique_ptr<ParquetFileWriter::Contents> FileSerializer::Open(
-    const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema,
-    const std::shared_ptr<WriterProperties>& properties,
-    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
-  std::unique_ptr<ParquetFileWriter::Contents> result(
-      new FileSerializer(sink, schema, properties, key_value_metadata));
-
-  return result;
-}
-
-void FileSerializer::Close() {
-  if (is_open_) {
-    if (row_group_writer_) {
-      num_rows_ += row_group_writer_->num_rows();
-      row_group_writer_->Close();
-    }
-    row_group_writer_.reset();
-
-    // Write magic bytes and metadata
-    WriteMetaData();
-
-    sink_->Close();
-    is_open_ = false;
-  }
-}
-
-int FileSerializer::num_columns() const { return schema_.num_columns(); }
-
-int FileSerializer::num_row_groups() const { return num_row_groups_; }
-
-int64_t FileSerializer::num_rows() const { return num_rows_; }
-
-const std::shared_ptr<WriterProperties>& FileSerializer::properties() const {
-  return properties_;
-}
-
-RowGroupWriter* FileSerializer::AppendRowGroup() {
-  if (row_group_writer_) {
-    row_group_writer_->Close();
-  }
-  num_row_groups_++;
-  auto rg_metadata = metadata_->AppendRowGroup();
-  std::unique_ptr<RowGroupWriter::Contents> contents(
-      new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get()));
-  row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
-  return row_group_writer_.get();
-}
-
-FileSerializer::~FileSerializer() {
-  try {
-    Close();
-  } catch (...) {
-  }
-}
-
-void FileSerializer::WriteMetaData() {
-  // Write MetaData
-  uint32_t metadata_len = static_cast<uint32_t>(sink_->Tell());
-
-  // Get a FileMetaData
-  auto metadata = metadata_->Finish();
-  metadata->WriteTo(sink_.get());
-  metadata_len = static_cast<uint32_t>(sink_->Tell()) - metadata_len;
-
-  // Write Footer
-  sink_->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
-  sink_->Write(PARQUET_MAGIC, 4);
-}
-
-FileSerializer::FileSerializer(
-    const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema,
-    const std::shared_ptr<WriterProperties>& properties,
-    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata)
-    : ParquetFileWriter::Contents(schema, key_value_metadata),
-      sink_(sink),
-      is_open_(true),
-      properties_(properties),
-      num_row_groups_(0),
-      num_rows_(0),
-      metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) {
-  StartFile();
-}
-
-void FileSerializer::StartFile() {
-  // Parquet files always start with PAR1
-  sink_->Write(PARQUET_MAGIC, 4);
-}
-
-}  // namespace parquet
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
deleted file mode 100644
index 3cd73fe..0000000
--- a/src/parquet/file/writer-internal.h
+++ /dev/null
@@ -1,153 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_FILE_WRITER_INTERNAL_H
-#define PARQUET_FILE_WRITER_INTERNAL_H
-
-#include <memory>
-#include <vector>
-
-#include "parquet/column_page.h"
-#include "parquet/file/metadata.h"
-#include "parquet/file/writer.h"
-#include "parquet/parquet_types.h"
-#include "parquet/util/memory.h"
-
-namespace arrow {
-
-class Codec;
-};
-
-namespace parquet {
-
-// This subclass delimits pages appearing in a serialized stream, each preceded
-// by a serialized Thrift format::PageHeader indicating the type of each page
-// and the page metadata.
-class SerializedPageWriter : public PageWriter {
- public:
-  SerializedPageWriter(OutputStream* sink, Compression::type codec,
-                       ColumnChunkMetaDataBuilder* metadata,
-                       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
-
-  virtual ~SerializedPageWriter() {}
-
-  int64_t WriteDataPage(const CompressedDataPage& page) override;
-
-  int64_t WriteDictionaryPage(const DictionaryPage& page) override;
-
-  /**
-   * Compress a buffer.
-   */
-  void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override;
-
-  bool has_compressor() override { return (compressor_ != nullptr); }
-
-  void Close(bool has_dictionary, bool fallback) override;
-
- private:
-  OutputStream* sink_;
-  ColumnChunkMetaDataBuilder* metadata_;
-  ::arrow::MemoryPool* pool_;
-  int64_t num_values_;
-  int64_t dictionary_page_offset_;
-  int64_t data_page_offset_;
-  int64_t total_uncompressed_size_;
-  int64_t total_compressed_size_;
-
-  // Compression codec to use.
-  std::unique_ptr<::arrow::Codec> compressor_;
-};
-
-// RowGroupWriter::Contents implementation for the Parquet file specification
-class RowGroupSerializer : public RowGroupWriter::Contents {
- public:
-  RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
-                     const WriterProperties* properties)
-      : sink_(sink),
-        metadata_(metadata),
-        properties_(properties),
-        total_bytes_written_(0),
-        closed_(false),
-        current_column_index_(0),
-        num_rows_(-1) {}
-
-  int num_columns() const override;
-  int64_t num_rows() const override;
-
-  ColumnWriter* NextColumn() override;
-  int current_column() const override;
-  void Close() override;
-
- private:
-  OutputStream* sink_;
-  mutable RowGroupMetaDataBuilder* metadata_;
-  const WriterProperties* properties_;
-  int64_t total_bytes_written_;
-  bool closed_;
-  int current_column_index_;
-  mutable int64_t num_rows_;
-
-  void CheckRowsWritten() const;
-
-  std::shared_ptr<ColumnWriter> current_column_writer_;
-};
-
-// An implementation of ParquetFileWriter::Contents that deals with the Parquet
-// file structure, Thrift serialization, and other internal matters
-
-class FileSerializer : public ParquetFileWriter::Contents {
- public:
-  static std::unique_ptr<ParquetFileWriter::Contents> Open(
-      const std::shared_ptr<OutputStream>& sink,
-      const std::shared_ptr<schema::GroupNode>& schema,
-      const std::shared_ptr<WriterProperties>& properties = default_writer_properties(),
-      const std::shared_ptr<const KeyValueMetadata>& key_value_metadata = nullptr);
-
-  void Close() override;
-
-  RowGroupWriter* AppendRowGroup() override;
-
-  const std::shared_ptr<WriterProperties>& properties() const override;
-
-  int num_columns() const override;
-  int num_row_groups() const override;
-  int64_t num_rows() const override;
-
-  virtual ~FileSerializer();
-
- private:
-  explicit FileSerializer(
-      const std::shared_ptr<OutputStream>& sink,
-      const std::shared_ptr<schema::GroupNode>& schema,
-      const std::shared_ptr<WriterProperties>& properties,
-      const std::shared_ptr<const KeyValueMetadata>& key_value_metadata);
-
-  std::shared_ptr<OutputStream> sink_;
-  bool is_open_;
-  const std::shared_ptr<WriterProperties> properties_;
-  int num_row_groups_;
-  int64_t num_rows_;
-  std::unique_ptr<FileMetaDataBuilder> metadata_;
-  std::unique_ptr<RowGroupWriter> row_group_writer_;
-
-  void StartFile();
-  void WriteMetaData();
-};
-
-}  // namespace parquet
-
-#endif  // PARQUET_FILE_WRITER_INTERNAL_H
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
deleted file mode 100644
index a91553b..0000000
--- a/src/parquet/file/writer.cc
+++ /dev/null
@@ -1,119 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/file/writer.h"
-
-#include "parquet/file/writer-internal.h"
-#include "parquet/util/memory.h"
-
-using parquet::schema::GroupNode;
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// RowGroupWriter public API
-
-RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents)
-    : contents_(std::move(contents)) {}
-
-void RowGroupWriter::Close() {
-  if (contents_) {
-    contents_->Close();
-  }
-}
-
-ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); }
-
-int RowGroupWriter::current_column() { return contents_->current_column(); }
-
-int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
-
-int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
-
-// ----------------------------------------------------------------------
-// ParquetFileWriter public API
-
-ParquetFileWriter::ParquetFileWriter() {}
-
-ParquetFileWriter::~ParquetFileWriter() {
-  try {
-    Close();
-  } catch (...) {
-  }
-}
-
-std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
-    const std::shared_ptr<::arrow::io::OutputStream>& sink,
-    const std::shared_ptr<GroupNode>& schema,
-    const std::shared_ptr<WriterProperties>& properties,
-    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
-  return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties,
-              key_value_metadata);
-}
-
-std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
-    const std::shared_ptr<OutputStream>& sink,
-    const std::shared_ptr<schema::GroupNode>& schema,
-    const std::shared_ptr<WriterProperties>& properties,
-    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
-  auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata);
-  std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
-  result->Open(std::move(contents));
-  return result;
-}
-
-const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); }
-
-const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
-  return contents_->schema()->Column(i);
-}
-
-int ParquetFileWriter::num_columns() const { return contents_->num_columns(); }
-
-int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); }
-
-int ParquetFileWriter::num_row_groups() const { return contents_->num_row_groups(); }
-
-const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_metadata()
-    const {
-  return contents_->key_value_metadata();
-}
-
-void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) {
-  contents_ = std::move(contents);
-}
-
-void ParquetFileWriter::Close() {
-  if (contents_) {
-    contents_->Close();
-    contents_.reset();
-  }
-}
-
-RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
-  return contents_->AppendRowGroup();
-}
-
-RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
-  return AppendRowGroup();
-}
-
-const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
-  return contents_->properties();
-}
-
-}  // namespace parquet
diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc
new file mode 100644
index 0000000..72c71c6
--- /dev/null
+++ b/src/parquet/file_reader.cc
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_reader.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "arrow/io/file.h"
+
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_scanner.h"
+#include "parquet/exception.h"
+#include "parquet/metadata.h"
+#include "parquet/parquet_types.h"
+#include "parquet/properties.h"
+#include "parquet/types.h"
+#include "parquet/util/logging.h"
+#include "parquet/util/memory.h"
+
+using std::string;
+
+namespace arrow {
+
+class Codec;
+
+}  // namespace arrow
+
+namespace parquet {
+
+// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
+static constexpr int64_t DEFAULT_FOOTER_READ_SIZE = 64 * 1024;
+static constexpr uint32_t FOOTER_SIZE = 8;
+static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
+
+// For PARQUET-816
+static constexpr int64_t kMaxDictHeaderSize = 100;
+
+// ----------------------------------------------------------------------
+// RowGroupReader public API
+
+RowGroupReader::RowGroupReader(std::unique_ptr<Contents> contents)
+    : contents_(std::move(contents)) {}
+
+std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
+  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+                                        << metadata()->num_columns()
+                                        << "columns, requested column: " << i;
+  const ColumnDescriptor* descr = metadata()->schema()->Column(i);
+
+  std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
+  return ColumnReader::Make(
+      descr, std::move(page_reader),
+      const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
+}
+
+std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
+  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+                                        << metadata()->num_columns()
+                                        << "columns, requested column: " << i;
+  return contents_->GetColumnPageReader(i);
+}
+
+// Returns the rowgroup metadata
+const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); }
+
+// RowGroupReader::Contents implementation for the Parquet file specification
+class SerializedRowGroup : public RowGroupReader::Contents {
+ public:
+  SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata,
+                     int row_group_number, const ReaderProperties& props)
+      : source_(source), file_metadata_(file_metadata), properties_(props) {
+    row_group_metadata_ = file_metadata->RowGroup(row_group_number);
+  }
+
+  const RowGroupMetaData* metadata() const override { return row_group_metadata_.get(); }
+
+  const ReaderProperties* properties() const override { return &properties_; }
+
+  std::unique_ptr<PageReader> GetColumnPageReader(int i) override {
+    // Read column chunk from the file
+    auto col = row_group_metadata_->ColumnChunk(i);
+
+    int64_t col_start = col->data_page_offset();
+    if (col->has_dictionary_page() && col_start > col->dictionary_page_offset()) {
+      col_start = col->dictionary_page_offset();
+    }
+
+    int64_t col_length = col->total_compressed_size();
+    std::unique_ptr<InputStream> stream;
+
+    // PARQUET-816 workaround for old files created by older parquet-mr
+    const ApplicationVersion& version = file_metadata_->writer_version();
+    if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION)) {
+      // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the
+      // dictionary page header size in total_compressed_size and total_uncompressed_size
+      // (see IMPALA-694). We add padding to compensate.
+      int64_t bytes_remaining = source_->Size() - (col_start + col_length);
+      int64_t padding = std::min<int64_t>(kMaxDictHeaderSize, bytes_remaining);
+      col_length += padding;
+    }
+
+    stream = properties_.GetStream(source_, col_start, col_length);
+
+    return PageReader::Open(std::move(stream), col->num_values(), col->compression(),
+                            properties_.memory_pool());
+  }
+
+ private:
+  RandomAccessSource* source_;
+  FileMetaData* file_metadata_;
+  std::unique_ptr<RowGroupMetaData> row_group_metadata_;
+  ReaderProperties properties_;
+};
+
+// ----------------------------------------------------------------------
+// SerializedFile: An implementation of ParquetFileReader::Contents that deals
+// with the Parquet file structure, Thrift deserialization, and other internal
+// matters
+
+// This class takes ownership of the provided data source
+class SerializedFile : public ParquetFileReader::Contents {
+ public:
+  SerializedFile(std::unique_ptr<RandomAccessSource> source,
+                 const ReaderProperties& props = default_reader_properties())
+      : source_(std::move(source)), properties_(props) {}
+
+  ~SerializedFile() {
+    try {
+      Close();
+    } catch (...) {
+    }
+  }
+
+  void Close() override { source_->Close(); }
+
+  std::shared_ptr<RowGroupReader> GetRowGroup(int i) override {
+    std::unique_ptr<SerializedRowGroup> contents(
+        new SerializedRowGroup(source_.get(), file_metadata_.get(), i, properties_));
+    return std::make_shared<RowGroupReader>(std::move(contents));
+  }
+
+  std::shared_ptr<FileMetaData> metadata() const override { return file_metadata_; }
+
+  void set_metadata(const std::shared_ptr<FileMetaData>& metadata) {
+    file_metadata_ = metadata;
+  }
+
+  void ParseMetaData() {
+    int64_t file_size = source_->Size();
+
+    if (file_size < FOOTER_SIZE) {
+      throw ParquetException("Corrupted file, smaller than file footer");
+    }
+
+    uint8_t footer_buffer[DEFAULT_FOOTER_READ_SIZE];
+    int64_t footer_read_size = std::min(file_size, DEFAULT_FOOTER_READ_SIZE);
+    int64_t bytes_read =
+        source_->ReadAt(file_size - footer_read_size, footer_read_size, footer_buffer);
+
+    // Check if all bytes are read. Check if last 4 bytes read have the magic bits
+    if (bytes_read != footer_read_size ||
+        memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) != 0) {
+      throw ParquetException("Invalid parquet file. Corrupt footer.");
+    }
+
+    uint32_t metadata_len =
+        *reinterpret_cast<uint32_t*>(footer_buffer + footer_read_size - FOOTER_SIZE);
+    int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len;
+    if (FOOTER_SIZE + metadata_len > file_size) {
+      throw ParquetException(
+          "Invalid parquet file. File is less than "
+          "file metadata size.");
+    }
+
+    std::shared_ptr<PoolBuffer> metadata_buffer =
+        AllocateBuffer(properties_.memory_pool(), metadata_len);
+
+    // Check if the footer_buffer contains the entire metadata
+    if (footer_read_size >= (metadata_len + FOOTER_SIZE)) {
+      memcpy(metadata_buffer->mutable_data(),
+             footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE),
+             metadata_len);
+    } else {
+      bytes_read =
+          source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data());
+      if (bytes_read != metadata_len) {
+        throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
+      }
+    }
+
+    file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len);
+  }
+
+ private:
+  std::unique_ptr<RandomAccessSource> source_;
+  std::shared_ptr<FileMetaData> file_metadata_;
+  ReaderProperties properties_;
+};
+
+// ----------------------------------------------------------------------
+// ParquetFileReader public API
+
+ParquetFileReader::ParquetFileReader() {}
+
+ParquetFileReader::~ParquetFileReader() {
+  try {
+    Close();
+  } catch (...) {
+  }
+}
+
+// Open the file. If no metadata is passed, it is parsed from the footer of
+// the file
+std::unique_ptr<ParquetFileReader::Contents> ParquetFileReader::Contents::Open(
+    std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
+    const std::shared_ptr<FileMetaData>& metadata) {
+  std::unique_ptr<ParquetFileReader::Contents> result(
+      new SerializedFile(std::move(source), props));
+
+  // Access private methods here, but otherwise unavailable
+  SerializedFile* file = static_cast<SerializedFile*>(result.get());
+
+  if (metadata == nullptr) {
+    // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor
+    file->ParseMetaData();
+  } else {
+    file->set_metadata(metadata);
+  }
+
+  return result;
+}
+
+std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
+    const std::shared_ptr<::arrow::io::ReadableFileInterface>& source,
+    const ReaderProperties& props, const std::shared_ptr<FileMetaData>& metadata) {
+  std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(source));
+  return Open(std::move(io_wrapper), props, metadata);
+}
+
+std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
+    std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props,
+    const std::shared_ptr<FileMetaData>& metadata) {
+  auto contents = SerializedFile::Open(std::move(source), props, metadata);
+  std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
+  result->Open(std::move(contents));
+  return result;
+}
+
+std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
+    const std::string& path, bool memory_map, const ReaderProperties& props,
+    const std::shared_ptr<FileMetaData>& metadata) {
+  std::shared_ptr<::arrow::io::ReadableFileInterface> source;
+  if (memory_map) {
+    std::shared_ptr<::arrow::io::ReadableFile> handle;
+    PARQUET_THROW_NOT_OK(
+        ::arrow::io::ReadableFile::Open(path, props.memory_pool(), &handle));
+    source = handle;
+  } else {
+    std::shared_ptr<::arrow::io::MemoryMappedFile> handle;
+    PARQUET_THROW_NOT_OK(
+        ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ, &handle));
+    source = handle;
+  }
+
+  return Open(source, props, metadata);
+}
+
+void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) {
+  contents_ = std::move(contents);
+}
+
+void ParquetFileReader::Close() {
+  if (contents_) {
+    contents_->Close();
+  }
+}
+
+std::shared_ptr<FileMetaData> ParquetFileReader::metadata() const {
+  return contents_->metadata();
+}
+
+std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
+  DCHECK(i < metadata()->num_row_groups()) << "The file only has "
+                                           << metadata()->num_row_groups()
+                                           << "row groups, requested reader for: " << i;
+  return contents_->GetRowGroup(i);
+}
+
+// ----------------------------------------------------------------------
+// File metadata helpers
+
+std::shared_ptr<FileMetaData> ReadMetaData(
+    const std::shared_ptr<::arrow::io::ReadableFileInterface>& source) {
+  return ParquetFileReader::Open(source)->metadata();
+}
+
+// ----------------------------------------------------------------------
+// File scanner for performance testing
+
+int64_t ScanFileContents(std::vector<int> columns, const int32_t column_batch_size,
+                         ParquetFileReader* reader) {
+  std::vector<int16_t> rep_levels(column_batch_size);
+  std::vector<int16_t> def_levels(column_batch_size);
+
+  int num_columns = static_cast<int>(columns.size());
+
+  // columns are not specified explicitly. Add all columns
+  if (columns.size() == 0) {
+    num_columns = reader->metadata()->num_columns();
+    columns.resize(num_columns);
+    for (int i = 0; i < num_columns; i++) {
+      columns[i] = i;
+    }
+  }
+
+  std::vector<int64_t> total_rows(num_columns, 0);
+
+  for (int r = 0; r < reader->metadata()->num_row_groups(); ++r) {
+    auto group_reader = reader->RowGroup(r);
+    int col = 0;
+    for (auto i : columns) {
+      std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
+      size_t value_byte_size = GetTypeByteSize(col_reader->descr()->physical_type());
+      std::vector<uint8_t> values(column_batch_size * value_byte_size);
+
+      int64_t values_read = 0;
+      while (col_reader->HasNext()) {
+        total_rows[col] +=
+            ScanAllValues(column_batch_size, def_levels.data(), rep_levels.data(),
+                          values.data(), &values_read, col_reader.get());
+      }
+      col++;
+    }
+  }
+
+  for (int i = 1; i < num_columns; ++i) {
+    if (total_rows[0] != total_rows[i]) {
+      throw ParquetException("Parquet error: Total rows among columns do not match");
+    }
+  }
+
+  return total_rows[0];
+}
+
+}  // namespace parquet
diff --git a/src/parquet/file/reader.h b/src/parquet/file_reader.h
similarity index 93%
rename from src/parquet/file/reader.h
rename to src/parquet/file_reader.h
index 7558394..f751e9b 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file_reader.h
@@ -25,8 +25,8 @@
 #include <string>
 #include <vector>
 
-#include "parquet/column_page.h"
-#include "parquet/file/metadata.h"
+#include "parquet/column_reader.h"
+#include "parquet/metadata.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
 #include "parquet/statistics.h"
@@ -71,6 +71,11 @@
   // easily create test fixtures
   // An implementation of the Contents class is defined in the .cc file
   struct Contents {
+    static std::unique_ptr<Contents> Open(
+        std::unique_ptr<RandomAccessSource> source,
+        const ReaderProperties& props = default_reader_properties(),
+        const std::shared_ptr<FileMetaData>& metadata = nullptr);
+
     virtual ~Contents() {}
     // Perform any cleanup associated with the file contents
     virtual void Close() = 0;
diff --git a/src/parquet/file_writer.cc b/src/parquet/file_writer.cc
new file mode 100644
index 0000000..87ee4f6
--- /dev/null
+++ b/src/parquet/file_writer.cc
@@ -0,0 +1,323 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file_writer.h"
+
+#include "parquet/column_writer.h"
+#include "parquet/schema-internal.h"
+#include "parquet/schema.h"
+#include "parquet/thrift.h"
+#include "parquet/util/memory.h"
+
+using arrow::MemoryPool;
+
+using parquet::schema::GroupNode;
+using parquet::schema::SchemaFlattener;
+
+namespace parquet {
+
+// FIXME: copied from reader-internal.cc
+static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
+
+// ----------------------------------------------------------------------
+// RowGroupWriter public API
+
+RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents)
+    : contents_(std::move(contents)) {}
+
+void RowGroupWriter::Close() {
+  if (contents_) {
+    contents_->Close();
+  }
+}
+
+ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); }
+
+int RowGroupWriter::current_column() { return contents_->current_column(); }
+
+int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
+
+int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
+
+// ----------------------------------------------------------------------
+// RowGroupSerializer
+
+// RowGroupWriter::Contents implementation for the Parquet file specification
+class RowGroupSerializer : public RowGroupWriter::Contents {
+ public:
+  RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
+                     const WriterProperties* properties)
+      : sink_(sink),
+        metadata_(metadata),
+        properties_(properties),
+        total_bytes_written_(0),
+        closed_(false),
+        current_column_index_(0),
+        num_rows_(-1) {}
+
+  int num_columns() const override { return metadata_->num_columns(); }
+
+  int64_t num_rows() const override {
+    if (current_column_writer_) {
+      CheckRowsWritten();
+    }
+    return num_rows_ < 0 ? 0 : num_rows_;
+  }
+
+  ColumnWriter* NextColumn() override {
+    if (current_column_writer_) {
+      CheckRowsWritten();
+    }
+
+    // Throws an error if more columns are being written
+    auto col_meta = metadata_->NextColumnChunk();
+
+    if (current_column_writer_) {
+      total_bytes_written_ += current_column_writer_->Close();
+    }
+
+    ++current_column_index_;
+
+    const ColumnDescriptor* column_descr = col_meta->descr();
+    std::unique_ptr<PageWriter> pager =
+        PageWriter::Open(sink_, properties_->compression(column_descr->path()), col_meta,
+                         properties_->memory_pool());
+    current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), properties_);
+    return current_column_writer_.get();
+  }
+
+  int current_column() const override { return metadata_->current_column(); }
+
+  void Close() override {
+    if (!closed_) {
+      closed_ = true;
+
+      if (current_column_writer_) {
+        CheckRowsWritten();
+        total_bytes_written_ += current_column_writer_->Close();
+        current_column_writer_.reset();
+      }
+
+      // Ensures all columns have been written
+      metadata_->Finish(total_bytes_written_);
+    }
+  }
+
+ private:
+  OutputStream* sink_;
+  mutable RowGroupMetaDataBuilder* metadata_;
+  const WriterProperties* properties_;
+  int64_t total_bytes_written_;
+  bool closed_;
+  int current_column_index_;
+  mutable int64_t num_rows_;
+
+  void CheckRowsWritten() const {
+    int64_t current_rows = current_column_writer_->rows_written();
+    if (num_rows_ < 0) {
+      num_rows_ = current_rows;
+      metadata_->set_num_rows(current_rows);
+    } else if (num_rows_ != current_rows) {
+      std::stringstream ss;
+      ss << "Column " << current_column_index_ << " had " << current_rows
+         << " while previous column had " << num_rows_;
+      throw ParquetException(ss.str());
+    }
+  }
+
+  std::shared_ptr<ColumnWriter> current_column_writer_;
+};
+
+// ----------------------------------------------------------------------
+// FileSerializer
+
+// An implementation of ParquetFileWriter::Contents that deals with the Parquet
+// file structure, Thrift serialization, and other internal matters
+
+class FileSerializer : public ParquetFileWriter::Contents {
+ public:
+  static std::unique_ptr<ParquetFileWriter::Contents> Open(
+      const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema,
+      const std::shared_ptr<WriterProperties>& properties,
+      const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+    std::unique_ptr<ParquetFileWriter::Contents> result(
+        new FileSerializer(sink, schema, properties, key_value_metadata));
+
+    return result;
+  }
+
+  void Close() override {
+    if (is_open_) {
+      if (row_group_writer_) {
+        num_rows_ += row_group_writer_->num_rows();
+        row_group_writer_->Close();
+      }
+      row_group_writer_.reset();
+
+      // Write magic bytes and metadata
+      WriteMetaData();
+
+      sink_->Close();
+      is_open_ = false;
+    }
+  }
+
+  int num_columns() const override { return schema_.num_columns(); }
+
+  int num_row_groups() const override { return num_row_groups_; }
+
+  int64_t num_rows() const override { return num_rows_; }
+
+  const std::shared_ptr<WriterProperties>& properties() const override {
+    return properties_;
+  }
+
+  RowGroupWriter* AppendRowGroup() override {
+    if (row_group_writer_) {
+      row_group_writer_->Close();
+    }
+    num_row_groups_++;
+    auto rg_metadata = metadata_->AppendRowGroup();
+    std::unique_ptr<RowGroupWriter::Contents> contents(
+        new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get()));
+    row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
+    return row_group_writer_.get();
+  }
+
+  ~FileSerializer() {
+    try {
+      Close();
+    } catch (...) {
+    }
+  }
+
+ private:
+  FileSerializer(const std::shared_ptr<OutputStream>& sink,
+                 const std::shared_ptr<GroupNode>& schema,
+                 const std::shared_ptr<WriterProperties>& properties,
+                 const std::shared_ptr<const KeyValueMetadata>& key_value_metadata)
+      : ParquetFileWriter::Contents(schema, key_value_metadata),
+        sink_(sink),
+        is_open_(true),
+        properties_(properties),
+        num_row_groups_(0),
+        num_rows_(0),
+        metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) {
+    StartFile();
+  }
+
+  std::shared_ptr<OutputStream> sink_;
+  bool is_open_;
+  const std::shared_ptr<WriterProperties> properties_;
+  int num_row_groups_;
+  int64_t num_rows_;
+  std::unique_ptr<FileMetaDataBuilder> metadata_;
+  std::unique_ptr<RowGroupWriter> row_group_writer_;
+
+  void StartFile() {
+    // Parquet files always start with PAR1
+    sink_->Write(PARQUET_MAGIC, 4);
+  }
+
+  void WriteMetaData() {
+    // Write MetaData
+    uint32_t metadata_len = static_cast<uint32_t>(sink_->Tell());
+
+    // Get a FileMetaData
+    auto metadata = metadata_->Finish();
+    metadata->WriteTo(sink_.get());
+    metadata_len = static_cast<uint32_t>(sink_->Tell()) - metadata_len;
+
+    // Write Footer
+    sink_->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
+    sink_->Write(PARQUET_MAGIC, 4);
+  }
+};
+
+// ----------------------------------------------------------------------
+// ParquetFileWriter public API
+
+ParquetFileWriter::ParquetFileWriter() {}
+
+ParquetFileWriter::~ParquetFileWriter() {
+  try {
+    Close();
+  } catch (...) {
+  }
+}
+
+std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
+    const std::shared_ptr<::arrow::io::OutputStream>& sink,
+    const std::shared_ptr<GroupNode>& schema,
+    const std::shared_ptr<WriterProperties>& properties,
+    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+  return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties,
+              key_value_metadata);
+}
+
+std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
+    const std::shared_ptr<OutputStream>& sink,
+    const std::shared_ptr<schema::GroupNode>& schema,
+    const std::shared_ptr<WriterProperties>& properties,
+    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata) {
+  auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata);
+  std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
+  result->Open(std::move(contents));
+  return result;
+}
+
+const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); }
+
+const ColumnDescriptor* ParquetFileWriter::descr(int i) const {
+  return contents_->schema()->Column(i);
+}
+
+int ParquetFileWriter::num_columns() const { return contents_->num_columns(); }
+
+int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); }
+
+int ParquetFileWriter::num_row_groups() const { return contents_->num_row_groups(); }
+
+const std::shared_ptr<const KeyValueMetadata>& ParquetFileWriter::key_value_metadata()
+    const {
+  return contents_->key_value_metadata();
+}
+
+void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) {
+  contents_ = std::move(contents);
+}
+
+void ParquetFileWriter::Close() {
+  if (contents_) {
+    contents_->Close();
+    contents_.reset();
+  }
+}
+
+RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
+  return contents_->AppendRowGroup();
+}
+
+RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
+  return AppendRowGroup();
+}
+
+const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
+  return contents_->properties();
+}
+
+}  // namespace parquet
diff --git a/src/parquet/file/writer.h b/src/parquet/file_writer.h
similarity index 98%
rename from src/parquet/file/writer.h
rename to src/parquet/file_writer.h
index 844aa16..f165261 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file_writer.h
@@ -21,7 +21,7 @@
 #include <cstdint>
 #include <memory>
 
-#include "parquet/file/metadata.h"
+#include "parquet/metadata.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
 #include "parquet/util/memory.h"
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/metadata-test.cc
similarity index 99%
rename from src/parquet/file/file-metadata-test.cc
rename to src/parquet/metadata-test.cc
index 49cba3a..b20293b 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/metadata-test.cc
@@ -15,8 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "parquet/metadata.h"
 #include <gtest/gtest.h>
-#include "parquet/file/metadata.h"
 #include "parquet/schema.h"
 #include "parquet/statistics.h"
 #include "parquet/types.h"
diff --git a/src/parquet/file/metadata.cc b/src/parquet/metadata.cc
similarity index 99%
rename from src/parquet/file/metadata.cc
rename to src/parquet/metadata.cc
index 9f1cdd7..1c7db86 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/metadata.cc
@@ -20,7 +20,7 @@
 #include <vector>
 
 #include "parquet/exception.h"
-#include "parquet/file/metadata.h"
+#include "parquet/metadata.h"
 #include "parquet/schema-internal.h"
 #include "parquet/schema.h"
 #include "parquet/thrift.h"
diff --git a/src/parquet/file/metadata.h b/src/parquet/metadata.h
similarity index 100%
rename from src/parquet/file/metadata.h
rename to src/parquet/metadata.h
diff --git a/src/parquet/file/printer.cc b/src/parquet/printer.cc
similarity index 99%
rename from src/parquet/file/printer.cc
rename to src/parquet/printer.cc
index 727552d..88b5528 100644
--- a/src/parquet/file/printer.cc
+++ b/src/parquet/printer.cc
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "parquet/file/printer.h"
+#include "parquet/printer.h"
 
 #include <string>
 #include <vector>
diff --git a/src/parquet/file/printer.h b/src/parquet/printer.h
similarity index 97%
rename from src/parquet/file/printer.h
rename to src/parquet/printer.h
index a18af4a..3b82882 100644
--- a/src/parquet/file/printer.h
+++ b/src/parquet/printer.h
@@ -25,7 +25,7 @@
 #include <string>
 #include <vector>
 
-#include "parquet/file/reader.h"
+#include "parquet/file_reader.h"
 
 namespace parquet {
 
diff --git a/src/parquet/properties-test.cc b/src/parquet/properties-test.cc
index 4a063c1..c740b59 100644
--- a/src/parquet/properties-test.cc
+++ b/src/parquet/properties-test.cc
@@ -19,7 +19,7 @@
 
 #include <string>
 
-#include "parquet/file/reader.h"
+#include "parquet/file_reader.h"
 #include "parquet/properties.h"
 
 namespace parquet {
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index cefa452..c536fdc 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -27,9 +27,8 @@
 
 #include "parquet/column_reader.h"
 #include "parquet/column_scanner.h"
-#include "parquet/file/printer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/reader.h"
+#include "parquet/file_reader.h"
+#include "parquet/printer.h"
 #include "parquet/util/memory.h"
 
 using std::string;
@@ -204,7 +203,7 @@
 TEST_F(TestLocalFile, FileClosedOnDestruction) {
   bool close_called = false;
   {
-    auto contents = SerializedFile::Open(
+    auto contents = ParquetFileReader::Contents::Open(
         std::unique_ptr<RandomAccessSource>(new HelperFileClosed(handle, &close_called)));
     std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
     result->Open(std::move(contents));
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
index fc905b5..e5992c6 100644
--- a/src/parquet/statistics-test.cc
+++ b/src/parquet/statistics-test.cc
@@ -26,8 +26,8 @@
 
 #include "parquet/column_reader.h"
 #include "parquet/column_writer.h"
-#include "parquet/file/reader.h"
-#include "parquet/file/writer.h"
+#include "parquet/file_reader.h"
+#include "parquet/file_writer.h"
 #include "parquet/schema.h"
 #include "parquet/statistics.h"
 #include "parquet/test-specialization.h"
@@ -194,8 +194,9 @@
 }
 
 template <typename TestType>
-typename std::vector<typename TestType::c_type> TestRowGroupStatistics<
-    TestType>::GetDeepCopy(const std::vector<typename TestType::c_type>& values) {
+typename std::vector<typename TestType::c_type>
+TestRowGroupStatistics<TestType>::GetDeepCopy(
+    const std::vector<typename TestType::c_type>& values) {
   return values;
 }
 
diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h
index 0698652..ac6d0a1 100644
--- a/src/parquet/test-util.h
+++ b/src/parquet/test-util.h
@@ -79,8 +79,7 @@
   explicit MockPageReader(const vector<shared_ptr<Page>>& pages)
       : pages_(pages), page_index_(0) {}
 
-  // Implement the PageReader interface
-  virtual shared_ptr<Page> NextPage() {
+  shared_ptr<Page> NextPage() override {
     if (page_index_ == static_cast<int>(pages_.size())) {
       // EOS to consumer
       return shared_ptr<Page>(nullptr);
@@ -88,6 +87,9 @@
     return pages_[page_index_++];
   }
 
+  // No-op
+  void set_max_page_header_size(uint32_t size) override {}
+
  private:
   vector<shared_ptr<Page>> pages_;
   int page_index_;