PARQUET-1037: allow arbitrary size row-groups

The main change made is that you don't have to specify the size of a row-group upfront when writing it. This is signalled through a "row_count_determined" flag which is threaded through the relevant classes.

The AppendRowGroup(int64_t num_rows) method should have identical behaviour as before.
The AppendRowGroup() method should fix its row group after completing a single column, and enforce all
future columns match that. Empty row groups are still not allowed.

(You'll definitely want to squash these commits. Apologies for the poor git usage.)

Author: Toby Shaw <labuser@LABWKS10>
Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #378 from TobyShaw/master and squashes the following commits:

5ba1b75 [Wes McKinney] * Remove number of expected rows from ColumnWriter in a backwards compatible   way, refactoring * Fix issue where column length was being checked twice after failed write
14bc5e8 [Toby Shaw] Allow arbitrary size row groups
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 56b4770..1786ca5 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -795,7 +795,7 @@
   // to write an Int96 file.
   this->sink_ = std::make_shared<InMemoryOutputStream>();
   auto writer = ParquetFileWriter::Open(this->sink_, schema);
-  RowGroupWriter* rg_writer = writer->AppendRowGroup(1);
+  RowGroupWriter* rg_writer = writer->AppendRowGroup();
   ColumnWriter* c_writer = rg_writer->NextColumn();
   auto typed_writer = dynamic_cast<TypedColumnWriter<Int96Type>*>(c_writer);
   ASSERT_NE(typed_writer, nullptr);
@@ -954,7 +954,7 @@
         reinterpret_cast<ParquetCDataType<TestType>*>(values_buffer.data());
     std::copy(values.cbegin(), values.cend(), values_parquet);
     for (int i = 0; i < num_chunks; i++) {
-      auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
+      auto row_group_writer = file_writer->AppendRowGroup();
       auto column_writer =
           static_cast<ParquetWriter<TestType>*>(row_group_writer->NextColumn());
       ParquetCDataType<TestType>* data = values_parquet + i * chunk_size;
@@ -1496,7 +1496,7 @@
 
     writer_ = parquet::ParquetFileWriter::Open(nested_parquet_, schema,
                                                default_writer_properties());
-    row_group_writer_ = writer_->AppendRowGroup(num_rows);
+    row_group_writer_ = writer_->AppendRowGroup();
   }
 
   void FinalizeParquetFile() {
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 55af292..7f1c45c 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -310,6 +310,7 @@
   std::unique_ptr<ParquetFileWriter> writer_;
   RowGroupWriter* row_group_writer_;
   std::shared_ptr<ArrowWriterProperties> arrow_properties_;
+  bool closed_;
 };
 
 FileWriter::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
@@ -318,13 +319,14 @@
       data_buffer_(pool),
       writer_(std::move(writer)),
       row_group_writer_(nullptr),
-      arrow_properties_(arrow_properties) {}
+      arrow_properties_(arrow_properties),
+      closed_(false) {}
 
 Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
   if (row_group_writer_ != nullptr) {
     PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
   }
-  PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup(chunk_size));
+  PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup());
   return Status::OK();
 }
 
@@ -791,10 +793,14 @@
 // ----------------------------------------------------------------------
 
 Status FileWriter::Impl::Close() {
-  if (row_group_writer_ != nullptr) {
-    PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+  if (!closed_) {
+    // Make idempotent
+    closed_ = true;
+    if (row_group_writer_ != nullptr) {
+      PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
+    }
+    PARQUET_CATCH_NOT_OK(writer_->Close());
   }
-  PARQUET_CATCH_NOT_OK(writer_->Close());
   return Status::OK();
 }
 
diff --git a/src/parquet/column-io-benchmark.cc b/src/parquet/column-io-benchmark.cc
index 2abf6fa..c20d6e2 100644
--- a/src/parquet/column-io-benchmark.cc
+++ b/src/parquet/column-io-benchmark.cc
@@ -35,8 +35,8 @@
                                          const WriterProperties* properties) {
   std::unique_ptr<SerializedPageWriter> pager(
       new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
-  return std::unique_ptr<Int64Writer>(new Int64Writer(
-      metadata, std::move(pager), output_size, Encoding::PLAIN, properties));
+  return std::unique_ptr<Int64Writer>(
+      new Int64Writer(metadata, std::move(pager), Encoding::PLAIN, properties));
 }
 
 std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 47b86e3..3e4c04f 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -85,8 +85,8 @@
       wp_builder.encoding(column_properties.encoding);
     }
     writer_properties_ = wp_builder.build();
-    std::shared_ptr<ColumnWriter> writer = ColumnWriter::Make(
-        metadata_.get(), std::move(pager), output_size, writer_properties_.get());
+    std::shared_ptr<ColumnWriter> writer =
+        ColumnWriter::Make(metadata_.get(), std::move(pager), writer_properties_.get());
     return std::static_pointer_cast<TypedColumnWriter<TestType>>(writer);
   }
 
@@ -402,39 +402,6 @@
   ASSERT_EQ(this->values_, this->values_out_);
 }
 
-TYPED_TEST(TestPrimitiveWriter, RequiredTooFewRows) {
-  this->GenerateData(SMALL_SIZE - 1);
-
-  auto writer = this->BuildWriter();
-  writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
-  ASSERT_THROW(writer->Close(), ParquetException);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RequiredTooMany) {
-  this->GenerateData(2 * SMALL_SIZE);
-
-  auto writer = this->BuildWriter();
-  ASSERT_THROW(
-      writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_),
-      ParquetException);
-}
-
-TYPED_TEST(TestPrimitiveWriter, RepeatedTooFewRows) {
-  // Optional and repeated, so definition and repetition levels
-  this->SetUpSchema(Repetition::REPEATED);
-
-  this->GenerateData(SMALL_SIZE);
-  std::vector<int16_t> definition_levels(SMALL_SIZE, 1);
-  definition_levels[1] = 0;
-  std::vector<int16_t> repetition_levels(SMALL_SIZE, 0);
-  repetition_levels[3] = 1;
-
-  auto writer = this->BuildWriter();
-  writer->WriteBatch(this->values_.size(), definition_levels.data(),
-                     repetition_levels.data(), this->values_ptr_);
-  ASSERT_THROW(writer->Close(), ParquetException);
-}
-
 TYPED_TEST(TestPrimitiveWriter, RequiredLargeChunk) {
   this->GenerateData(LARGE_SIZE);
 
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index ac7e2ba..4aadf2b 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -113,13 +113,11 @@
 }
 
 ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
-                           std::unique_ptr<PageWriter> pager, int64_t expected_rows,
-                           bool has_dictionary, Encoding::type encoding,
-                           const WriterProperties* properties)
+                           std::unique_ptr<PageWriter> pager, bool has_dictionary,
+                           Encoding::type encoding, const WriterProperties* properties)
     : metadata_(metadata),
       descr_(metadata->descr()),
       pager_(std::move(pager)),
-      expected_rows_(expected_rows),
       has_dictionary_(has_dictionary),
       encoding_(encoding),
       properties_(properties),
@@ -127,7 +125,7 @@
       pool_(properties->memory_pool()),
       num_buffered_values_(0),
       num_buffered_encoded_values_(0),
-      num_rows_(0),
+      rows_written_(0),
       total_bytes_written_(0),
       closed_(false),
       fallback_(false) {
@@ -274,13 +272,6 @@
     pager_->Close(has_dictionary_, fallback_);
   }
 
-  if (num_rows_ != expected_rows_) {
-    std::stringstream ss;
-    ss << "Written rows: " << num_rows_ << " != expected rows: " << expected_rows_
-       << "in the current column chunk";
-    throw ParquetException(ss.str());
-  }
-
   return total_bytes_written_;
 }
 
@@ -301,11 +292,10 @@
 template <typename Type>
 TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
                                            std::unique_ptr<PageWriter> pager,
-                                           int64_t expected_rows, Encoding::type encoding,
+                                           Encoding::type encoding,
                                            const WriterProperties* properties)
-    : ColumnWriter(metadata, std::move(pager), expected_rows,
-                   (encoding == Encoding::PLAIN_DICTIONARY ||
-                    encoding == Encoding::RLE_DICTIONARY),
+    : ColumnWriter(metadata, std::move(pager), (encoding == Encoding::PLAIN_DICTIONARY ||
+                                                encoding == Encoding::RLE_DICTIONARY),
                    encoding, properties) {
   switch (encoding) {
     case Encoding::PLAIN:
@@ -384,7 +374,6 @@
 
 std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
                                                  std::unique_ptr<PageWriter> pager,
-                                                 int64_t expected_rows,
                                                  const WriterProperties* properties) {
   const ColumnDescriptor* descr = metadata->descr();
   Encoding::type encoding = properties->encoding(descr->path());
@@ -394,29 +383,29 @@
   }
   switch (descr->physical_type()) {
     case Type::BOOLEAN:
-      return std::make_shared<BoolWriter>(metadata, std::move(pager), expected_rows,
-                                          encoding, properties);
+      return std::make_shared<BoolWriter>(metadata, std::move(pager), encoding,
+                                          properties);
     case Type::INT32:
-      return std::make_shared<Int32Writer>(metadata, std::move(pager), expected_rows,
-                                           encoding, properties);
+      return std::make_shared<Int32Writer>(metadata, std::move(pager), encoding,
+                                           properties);
     case Type::INT64:
-      return std::make_shared<Int64Writer>(metadata, std::move(pager), expected_rows,
-                                           encoding, properties);
+      return std::make_shared<Int64Writer>(metadata, std::move(pager), encoding,
+                                           properties);
     case Type::INT96:
-      return std::make_shared<Int96Writer>(metadata, std::move(pager), expected_rows,
-                                           encoding, properties);
+      return std::make_shared<Int96Writer>(metadata, std::move(pager), encoding,
+                                           properties);
     case Type::FLOAT:
-      return std::make_shared<FloatWriter>(metadata, std::move(pager), expected_rows,
-                                           encoding, properties);
+      return std::make_shared<FloatWriter>(metadata, std::move(pager), encoding,
+                                           properties);
     case Type::DOUBLE:
-      return std::make_shared<DoubleWriter>(metadata, std::move(pager), expected_rows,
-                                            encoding, properties);
+      return std::make_shared<DoubleWriter>(metadata, std::move(pager), encoding,
+                                            properties);
     case Type::BYTE_ARRAY:
-      return std::make_shared<ByteArrayWriter>(metadata, std::move(pager), expected_rows,
-                                               encoding, properties);
+      return std::make_shared<ByteArrayWriter>(metadata, std::move(pager), encoding,
+                                               properties);
     case Type::FIXED_LEN_BYTE_ARRAY:
-      return std::make_shared<FixedLenByteArrayWriter>(
-          metadata, std::move(pager), expected_rows, encoding, properties);
+      return std::make_shared<FixedLenByteArrayWriter>(metadata, std::move(pager),
+                                                       encoding, properties);
     default:
       ParquetException::NYI("type reader not implemented");
   }
@@ -453,18 +442,14 @@
     // Count the occasions where we start a new row
     for (int64_t i = 0; i < num_values; ++i) {
       if (rep_levels[i] == 0) {
-        num_rows_++;
+        rows_written_++;
       }
     }
 
     WriteRepetitionLevels(num_values, rep_levels);
   } else {
     // Each value is exactly one row
-    num_rows_ += static_cast<int>(num_values);
-  }
-
-  if (num_rows_ > expected_rows_) {
-    throw ParquetException("More rows were written in the column chunk than expected");
+    rows_written_ += static_cast<int>(num_values);
   }
 
   // PARQUET-780
@@ -527,18 +512,14 @@
     // Count the occasions where we start a new row
     for (int64_t i = 0; i < num_values; ++i) {
       if (rep_levels[i] == 0) {
-        num_rows_++;
+        rows_written_++;
       }
     }
 
     WriteRepetitionLevels(num_values, rep_levels);
   } else {
     // Each value is exactly one row
-    num_rows_ += static_cast<int>(num_values);
-  }
-
-  if (num_rows_ > expected_rows_) {
-    throw ParquetException("More rows were written in the column chunk than expected");
+    rows_written_ += static_cast<int>(num_values);
   }
 
   if (descr_->schema_node()->is_optional()) {
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 837d2d0..0408747 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -73,12 +73,11 @@
 class PARQUET_EXPORT ColumnWriter {
  public:
   ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>,
-               int64_t expected_rows, bool has_dictionary, Encoding::type encoding,
+               bool has_dictionary, Encoding::type encoding,
                const WriterProperties* properties);
 
   static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
                                             std::unique_ptr<PageWriter>,
-                                            int64_t expected_rows,
                                             const WriterProperties* properties);
 
   Type::type type() const { return descr_->physical_type(); }
@@ -92,6 +91,8 @@
    */
   int64_t Close();
 
+  int64_t rows_written() const { return rows_written_; }
+
  protected:
   virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
 
@@ -138,8 +139,6 @@
 
   std::unique_ptr<PageWriter> pager_;
 
-  // The number of rows that should be written in this column chunk.
-  int64_t expected_rows_;
   bool has_dictionary_;
   Encoding::type encoding_;
   const WriterProperties* properties_;
@@ -162,7 +161,7 @@
   int64_t num_buffered_encoded_values_;
 
   // Total number of rows written with this ColumnWriter
-  int num_rows_;
+  int rows_written_;
 
   // Records the total number of bytes written by the serializer
   int64_t total_bytes_written_;
@@ -195,8 +194,8 @@
   typedef typename DType::c_type T;
 
   TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
-                    std::unique_ptr<PageWriter> pager, int64_t expected_rows,
-                    Encoding::type encoding, const WriterProperties* properties);
+                    std::unique_ptr<PageWriter> pager, Encoding::type encoding,
+                    const WriterProperties* properties);
 
   // Write a batch of repetition levels, definition levels, and values to the
   // column.
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index 5bd8419..49cba3a 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -55,8 +55,8 @@
       .set_max(std::string(reinterpret_cast<const char*>(&float_max), 4));
 
   auto f_builder = FileMetaDataBuilder::Make(&schema, props);
-  auto rg1_builder = f_builder->AppendRowGroup(nrows / 2);
-  auto rg2_builder = f_builder->AppendRowGroup(nrows / 2);
+  auto rg1_builder = f_builder->AppendRowGroup();
+  auto rg2_builder = f_builder->AppendRowGroup();
 
   // Write the metadata
   // rowgroup1 metadata
@@ -67,6 +67,8 @@
   col2_builder->SetStatistics(true, stats_float);
   col1_builder->Finish(nrows / 2, 4, 0, 10, 512, 600, true, false);
   col2_builder->Finish(nrows / 2, 24, 0, 30, 512, 600, true, false);
+
+  rg1_builder->set_num_rows(nrows / 2);
   rg1_builder->Finish(1024);
 
   // rowgroup2 metadata
@@ -77,6 +79,8 @@
   col2_builder->SetStatistics(true, stats_float);
   col1_builder->Finish(nrows / 2, 6, 0, 10, 512, 600, true, false);
   col2_builder->Finish(nrows / 2, 16, 0, 26, 512, 600, true, false);
+
+  rg2_builder->set_num_rows(nrows / 2);
   rg2_builder->Finish(1024);
 
   // Read the metadata
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 059df0b..75f3fbd 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -41,11 +41,15 @@
 
   void SetUp() {
     num_columns_ = 4;
+    num_rowgroups_ = 2;
+    rows_per_rowgroup_ = 50;
     this->SetUpSchema(Repetition::OPTIONAL, num_columns_);
   }
 
  protected:
   int num_columns_;
+  int num_rowgroups_;
+  int rows_per_rowgroup_;
 
   void FileSerializeTest(Compression::type codec_type) {
     std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
@@ -59,48 +63,116 @@
     std::shared_ptr<WriterProperties> writer_properties = prop_builder.build();
 
     auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
-    auto row_group_writer = file_writer->AppendRowGroup(100);
+    for (int rg = 0; rg < num_rowgroups_; ++rg) {
+      RowGroupWriter* row_group_writer;
+      row_group_writer = file_writer->AppendRowGroup();
+      this->GenerateData(rows_per_rowgroup_);
+      for (int col = 0; col < num_columns_; ++col) {
+        auto column_writer =
+            static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+        column_writer->WriteBatch(rows_per_rowgroup_, this->def_levels_.data(), nullptr,
+                                  this->values_ptr_);
+        column_writer->Close();
+      }
 
-    this->GenerateData(100);
-    for (int i = 0; i < num_columns_; ++i) {
-      auto column_writer =
-          static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
-      column_writer->WriteBatch(100, this->def_levels_.data(), nullptr,
-                                this->values_ptr_);
-      column_writer->Close();
+      row_group_writer->Close();
     }
-
-    row_group_writer->Close();
     file_writer->Close();
 
     auto buffer = sink->GetBuffer();
+    int num_rows_ = num_rowgroups_ * rows_per_rowgroup_;
 
     auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
     auto file_reader = ParquetFileReader::Open(source);
     ASSERT_EQ(num_columns_, file_reader->metadata()->num_columns());
-    ASSERT_EQ(1, file_reader->metadata()->num_row_groups());
-    ASSERT_EQ(100, file_reader->metadata()->num_rows());
+    ASSERT_EQ(num_rowgroups_, file_reader->metadata()->num_row_groups());
+    ASSERT_EQ(num_rows_, file_reader->metadata()->num_rows());
 
-    auto rg_reader = file_reader->RowGroup(0);
-    ASSERT_EQ(num_columns_, rg_reader->metadata()->num_columns());
-    ASSERT_EQ(100, rg_reader->metadata()->num_rows());
-    // Check that the specified compression was actually used.
-    ASSERT_EQ(codec_type, rg_reader->metadata()->ColumnChunk(0)->compression());
+    for (int rg = 0; rg < num_rowgroups_; ++rg) {
+      auto rg_reader = file_reader->RowGroup(rg);
+      ASSERT_EQ(num_columns_, rg_reader->metadata()->num_columns());
+      ASSERT_EQ(rows_per_rowgroup_, rg_reader->metadata()->num_rows());
+      // Check that the specified compression was actually used.
+      ASSERT_EQ(codec_type, rg_reader->metadata()->ColumnChunk(0)->compression());
 
-    int64_t values_read;
+      int64_t values_read;
 
-    for (int i = 0; i < num_columns_; ++i) {
-      std::vector<int16_t> def_levels_out(100);
-      std::vector<int16_t> rep_levels_out(100);
-      auto col_reader =
-          std::static_pointer_cast<TypedColumnReader<TestType>>(rg_reader->Column(i));
-      this->SetupValuesOut(100);
-      col_reader->ReadBatch(100, def_levels_out.data(), rep_levels_out.data(),
-                            this->values_out_ptr_, &values_read);
-      this->SyncValuesOut();
-      ASSERT_EQ(100, values_read);
-      ASSERT_EQ(this->values_, this->values_out_);
-      ASSERT_EQ(this->def_levels_, def_levels_out);
+      for (int i = 0; i < num_columns_; ++i) {
+        std::vector<int16_t> def_levels_out(rows_per_rowgroup_);
+        std::vector<int16_t> rep_levels_out(rows_per_rowgroup_);
+        auto col_reader =
+            std::static_pointer_cast<TypedColumnReader<TestType>>(rg_reader->Column(i));
+        this->SetupValuesOut(rows_per_rowgroup_);
+        col_reader->ReadBatch(rows_per_rowgroup_, def_levels_out.data(),
+                              rep_levels_out.data(), this->values_out_ptr_, &values_read);
+        this->SyncValuesOut();
+        ASSERT_EQ(rows_per_rowgroup_, values_read);
+        ASSERT_EQ(this->values_, this->values_out_);
+        ASSERT_EQ(this->def_levels_, def_levels_out);
+      }
+    }
+  }
+
+  void UnequalNumRows(int64_t max_rows, const std::vector<int64_t> rows_per_column) {
+    std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
+    auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
+
+    std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
+
+    auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
+
+    RowGroupWriter* row_group_writer;
+    row_group_writer = file_writer->AppendRowGroup();
+
+    this->GenerateData(max_rows);
+    for (int col = 0; col < num_columns_; ++col) {
+      auto column_writer =
+          static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+      column_writer->WriteBatch(rows_per_column[col], this->def_levels_.data(), nullptr,
+                                this->values_ptr_);
+      column_writer->Close();
+    }
+    row_group_writer->Close();
+    file_writer->Close();
+  }
+
+  void RepeatedUnequalRows() {
+    // Optional and repeated, so definition and repetition levels
+    this->SetUpSchema(Repetition::REPEATED);
+
+    const int kNumRows = 100;
+    this->GenerateData(kNumRows);
+
+    std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
+    auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
+    std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
+    auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
+
+    RowGroupWriter* row_group_writer;
+    row_group_writer = file_writer->AppendRowGroup();
+
+    this->GenerateData(kNumRows);
+
+    std::vector<int16_t> definition_levels(kNumRows, 1);
+    std::vector<int16_t> repetition_levels(kNumRows, 0);
+
+    {
+      auto column_writer =
+          static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+      column_writer->WriteBatch(kNumRows, definition_levels.data(),
+                                repetition_levels.data(), this->values_ptr_);
+      column_writer->Close();
+    }
+
+    definition_levels[1] = 0;
+    repetition_levels[3] = 1;
+
+    {
+      auto column_writer =
+          static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+      column_writer->WriteBatch(kNumRows, definition_levels.data(),
+                                repetition_levels.data(), this->values_ptr_);
+      column_writer->Close();
     }
   }
 };
@@ -115,6 +187,20 @@
   this->FileSerializeTest(Compression::UNCOMPRESSED);
 }
 
+TYPED_TEST(TestSerialize, TooFewRows) {
+  std::vector<int64_t> num_rows = {100, 100, 100, 99};
+  ASSERT_THROW(this->UnequalNumRows(100, num_rows), ParquetException);
+}
+
+TYPED_TEST(TestSerialize, TooManyRows) {
+  std::vector<int64_t> num_rows = {100, 100, 100, 101};
+  ASSERT_THROW(this->UnequalNumRows(101, num_rows), ParquetException);
+}
+
+TYPED_TEST(TestSerialize, RepeatedTooFewRows) {
+  ASSERT_THROW(this->RepeatedUnequalRows(), ParquetException);
+}
+
 TYPED_TEST(TestSerialize, SmallFileSnappy) {
   this->FileSerializeTest(Compression::SNAPPY);
 }
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index 6e7fc3b..9f1cdd7 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -651,13 +651,11 @@
 
 class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
  public:
-  explicit RowGroupMetaDataBuilderImpl(int64_t num_rows,
-                                       const std::shared_ptr<WriterProperties>& props,
+  explicit RowGroupMetaDataBuilderImpl(const std::shared_ptr<WriterProperties>& props,
                                        const SchemaDescriptor* schema, uint8_t* contents)
       : properties_(props), schema_(schema), current_column_(0) {
     row_group_ = reinterpret_cast<format::RowGroup*>(contents);
     InitializeColumns(schema->num_columns());
-    row_group_->__set_num_rows(num_rows);
   }
   ~RowGroupMetaDataBuilderImpl() {}
 
@@ -702,8 +700,12 @@
     row_group_->__set_total_byte_size(total_byte_size);
   }
 
+  void set_num_rows(int64_t num_rows) { row_group_->num_rows = num_rows; }
+
   int num_columns() { return static_cast<int>(row_group_->columns.size()); }
 
+  int64_t num_rows() { return row_group_->num_rows; }
+
  private:
   void InitializeColumns(int ncols) { row_group_->columns.resize(ncols); }
 
@@ -715,17 +717,17 @@
 };
 
 std::unique_ptr<RowGroupMetaDataBuilder> RowGroupMetaDataBuilder::Make(
-    int64_t num_rows, const std::shared_ptr<WriterProperties>& props,
-    const SchemaDescriptor* schema_, uint8_t* contents) {
+    const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
+    uint8_t* contents) {
   return std::unique_ptr<RowGroupMetaDataBuilder>(
-      new RowGroupMetaDataBuilder(num_rows, props, schema_, contents));
+      new RowGroupMetaDataBuilder(props, schema_, contents));
 }
 
 RowGroupMetaDataBuilder::RowGroupMetaDataBuilder(
-    int64_t num_rows, const std::shared_ptr<WriterProperties>& props,
-    const SchemaDescriptor* schema_, uint8_t* contents)
+    const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
+    uint8_t* contents)
     : impl_{std::unique_ptr<RowGroupMetaDataBuilderImpl>(
-          new RowGroupMetaDataBuilderImpl(num_rows, props, schema_, contents))} {}
+          new RowGroupMetaDataBuilderImpl(props, schema_, contents))} {}
 
 RowGroupMetaDataBuilder::~RowGroupMetaDataBuilder() {}
 
@@ -737,6 +739,12 @@
 
 int RowGroupMetaDataBuilder::num_columns() { return impl_->num_columns(); }
 
+int64_t RowGroupMetaDataBuilder::num_rows() { return impl_->num_rows(); }
+
+void RowGroupMetaDataBuilder::set_num_rows(int64_t num_rows) {
+  impl_->set_num_rows(num_rows);
+}
+
 void RowGroupMetaDataBuilder::Finish(int64_t total_bytes_written) {
   impl_->Finish(total_bytes_written);
 }
@@ -753,10 +761,10 @@
   }
   ~FileMetaDataBuilderImpl() {}
 
-  RowGroupMetaDataBuilder* AppendRowGroup(int64_t num_rows) {
+  RowGroupMetaDataBuilder* AppendRowGroup() {
     auto row_group = std::unique_ptr<format::RowGroup>(new format::RowGroup());
     auto row_group_builder = RowGroupMetaDataBuilder::Make(
-        num_rows, properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get()));
+        properties_, schema_, reinterpret_cast<uint8_t*>(row_group.get()));
     RowGroupMetaDataBuilder* row_group_ptr = row_group_builder.get();
     row_group_builders_.push_back(std::move(row_group_builder));
     row_groups_.push_back(std::move(row_group));
@@ -836,8 +844,8 @@
 
 FileMetaDataBuilder::~FileMetaDataBuilder() {}
 
-RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup(int64_t num_rows) {
-  return impl_->AppendRowGroup(num_rows);
+RowGroupMetaDataBuilder* FileMetaDataBuilder::AppendRowGroup() {
+  return impl_->AppendRowGroup();
 }
 
 std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return impl_->Finish(); }
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 7f990e0..7850358 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -224,21 +224,23 @@
  public:
   // API convenience to get a MetaData reader
   static std::unique_ptr<RowGroupMetaDataBuilder> Make(
-      int64_t num_rows, const std::shared_ptr<WriterProperties>& props,
-      const SchemaDescriptor* schema_, uint8_t* contents);
+      const std::shared_ptr<WriterProperties>& props, const SchemaDescriptor* schema_,
+      uint8_t* contents);
 
   ~RowGroupMetaDataBuilder();
 
   ColumnChunkMetaDataBuilder* NextColumnChunk();
   int num_columns();
+  int64_t num_rows();
   int current_column() const;
 
+  void set_num_rows(int64_t num_rows);
+
   // commit the metadata
   void Finish(int64_t total_bytes_written);
 
  private:
-  explicit RowGroupMetaDataBuilder(int64_t num_rows,
-                                   const std::shared_ptr<WriterProperties>& props,
+  explicit RowGroupMetaDataBuilder(const std::shared_ptr<WriterProperties>& props,
                                    const SchemaDescriptor* schema_, uint8_t* contents);
   // PIMPL Idiom
   class RowGroupMetaDataBuilderImpl;
@@ -254,7 +256,7 @@
 
   ~FileMetaDataBuilder();
 
-  RowGroupMetaDataBuilder* AppendRowGroup(int64_t num_rows);
+  RowGroupMetaDataBuilder* AppendRowGroup();
 
   // commit the metadata
   std::unique_ptr<FileMetaData> Finish();
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index 5702d2c..712289b 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -174,9 +174,31 @@
 
 int RowGroupSerializer::num_columns() const { return metadata_->num_columns(); }
 
-int64_t RowGroupSerializer::num_rows() const { return num_rows_; }
+int64_t RowGroupSerializer::num_rows() const {
+  if (current_column_writer_) {
+    CheckRowsWritten();
+  }
+  return num_rows_ < 0 ? 0 : num_rows_;
+}
+
+void RowGroupSerializer::CheckRowsWritten() const {
+  int64_t current_rows = current_column_writer_->rows_written();
+  if (num_rows_ < 0) {
+    num_rows_ = current_rows;
+    metadata_->set_num_rows(current_rows);
+  } else if (num_rows_ != current_rows) {
+    std::stringstream ss;
+    ss << "Column " << current_column_index_ << " had " << current_rows
+       << " while previous column had " << num_rows_;
+    throw ParquetException(ss.str());
+  }
+}
 
 ColumnWriter* RowGroupSerializer::NextColumn() {
+  if (current_column_writer_) {
+    CheckRowsWritten();
+  }
+
   // Throws an error if more columns are being written
   auto col_meta = metadata_->NextColumnChunk();
 
@@ -184,12 +206,13 @@
     total_bytes_written_ += current_column_writer_->Close();
   }
 
+  ++current_column_index_;
+
   const ColumnDescriptor* column_descr = col_meta->descr();
   std::unique_ptr<PageWriter> pager(
       new SerializedPageWriter(sink_, properties_->compression(column_descr->path()),
                                col_meta, properties_->memory_pool()));
-  current_column_writer_ =
-      ColumnWriter::Make(col_meta, std::move(pager), num_rows_, properties_);
+  current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), properties_);
   return current_column_writer_.get();
 }
 
@@ -200,9 +223,11 @@
     closed_ = true;
 
     if (current_column_writer_) {
+      CheckRowsWritten();
       total_bytes_written_ += current_column_writer_->Close();
       current_column_writer_.reset();
     }
+
     // Ensures all columns have been written
     metadata_->Finish(total_bytes_written_);
   }
@@ -224,6 +249,7 @@
 void FileSerializer::Close() {
   if (is_open_) {
     if (row_group_writer_) {
+      num_rows_ += row_group_writer_->num_rows();
       row_group_writer_->Close();
     }
     row_group_writer_.reset();
@@ -246,15 +272,14 @@
   return properties_;
 }
 
-RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) {
+RowGroupWriter* FileSerializer::AppendRowGroup() {
   if (row_group_writer_) {
     row_group_writer_->Close();
   }
-  num_rows_ += num_rows;
   num_row_groups_++;
-  auto rg_metadata = metadata_->AppendRowGroup(num_rows);
+  auto rg_metadata = metadata_->AppendRowGroup();
   std::unique_ptr<RowGroupWriter::Contents> contents(
-      new RowGroupSerializer(num_rows, sink_.get(), rg_metadata, properties_.get()));
+      new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get()));
   row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
   return row_group_writer_.get();
 }
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 5aba994..3cd73fe 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -75,15 +75,15 @@
 // RowGroupWriter::Contents implementation for the Parquet file specification
 class RowGroupSerializer : public RowGroupWriter::Contents {
  public:
-  RowGroupSerializer(int64_t num_rows, OutputStream* sink,
-                     RowGroupMetaDataBuilder* metadata,
+  RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
                      const WriterProperties* properties)
-      : num_rows_(num_rows),
-        sink_(sink),
+      : sink_(sink),
         metadata_(metadata),
         properties_(properties),
         total_bytes_written_(0),
-        closed_(false) {}
+        closed_(false),
+        current_column_index_(0),
+        num_rows_(-1) {}
 
   int num_columns() const override;
   int64_t num_rows() const override;
@@ -93,12 +93,15 @@
   void Close() override;
 
  private:
-  int64_t num_rows_;
   OutputStream* sink_;
-  RowGroupMetaDataBuilder* metadata_;
+  mutable RowGroupMetaDataBuilder* metadata_;
   const WriterProperties* properties_;
   int64_t total_bytes_written_;
   bool closed_;
+  int current_column_index_;
+  mutable int64_t num_rows_;
+
+  void CheckRowsWritten() const;
 
   std::shared_ptr<ColumnWriter> current_column_writer_;
 };
@@ -116,7 +119,7 @@
 
   void Close() override;
 
-  RowGroupWriter* AppendRowGroup(int64_t num_rows) override;
+  RowGroupWriter* AppendRowGroup() override;
 
   const std::shared_ptr<WriterProperties>& properties() const override;
 
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
index a1b9227..a91553b 100644
--- a/src/parquet/file/writer.cc
+++ b/src/parquet/file/writer.cc
@@ -33,7 +33,6 @@
 void RowGroupWriter::Close() {
   if (contents_) {
     contents_->Close();
-    contents_.reset();
   }
 }
 
@@ -105,8 +104,12 @@
   }
 }
 
+RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
+  return contents_->AppendRowGroup();
+}
+
 RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
-  return contents_->AppendRowGroup(num_rows);
+  return AppendRowGroup();
 }
 
 const std::shared_ptr<WriterProperties>& ParquetFileWriter::properties() const {
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
index c2b3f91..844aa16 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file/writer.h
@@ -56,14 +56,12 @@
 
   explicit RowGroupWriter(std::unique_ptr<Contents> contents);
 
-  /**
-   * Construct a ColumnWriter for the indicated row group-relative column.
-   *
-   * Ownership is solely within the RowGroupWriter. The ColumnWriter is only valid
-   * until the next call to NextColumn or Close. As the contents are directly written to
-   * the sink, once a new column is started, the contents of the previous one cannot be
-   * modified anymore.
-   */
+  /// Construct a ColumnWriter for the indicated row group-relative column.
+  ///
+  /// Ownership is solely within the RowGroupWriter. The ColumnWriter is only
+  /// valid until the next call to NextColumn or Close. As the contents are
+  /// directly written to the sink, once a new column is started, the contents
+  /// of the previous one cannot be modified anymore.
   ColumnWriter* NextColumn();
   /// Index of currently written column
   int current_column();
@@ -96,7 +94,10 @@
     // Perform any cleanup associated with the file contents
     virtual void Close() = 0;
 
-    virtual RowGroupWriter* AppendRowGroup(int64_t num_rows) = 0;
+    /// \deprecated Since 1.3.0
+    RowGroupWriter* AppendRowGroup(int64_t num_rows);
+
+    virtual RowGroupWriter* AppendRowGroup() = 0;
 
     virtual int64_t num_rows() const = 0;
     virtual int num_columns() const = 0;
@@ -135,54 +136,45 @@
   void Open(std::unique_ptr<Contents> contents);
   void Close();
 
-  /**
-   * Construct a RowGroupWriter for the indicated number of rows.
-   *
-   * Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
-   * until the next call to AppendRowGroup or Close.
-   *
-   * @param num_rows The number of rows that are stored in the new RowGroup
-   */
+  // Construct a RowGroupWriter for the indicated number of rows.
+  //
+  // Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
+  // until the next call to AppendRowGroup or Close.
+  // @param num_rows The number of rows that are stored in the new RowGroup
+  //
+  // \deprecated Since 1.3.0
   RowGroupWriter* AppendRowGroup(int64_t num_rows);
 
-  /**
-   * Number of columns.
-   *
-   * This number is fixed during the lifetime of the writer as it is determined via
-   * the schema.
-   */
+  /// Construct a RowGroupWriter with an arbitrary number of rows.
+  ///
+  /// Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
+  /// until the next call to AppendRowGroup or Close.
+  RowGroupWriter* AppendRowGroup();
+
+  /// Number of columns.
+  ///
+  /// This number is fixed during the lifetime of the writer as it is determined via
+  /// the schema.
   int num_columns() const;
 
-  /**
-   * Number of rows in the yet started RowGroups.
-   *
-   * Changes on the addition of a new RowGroup.
-   */
+  /// Number of rows in the yet started RowGroups.
+  ///
+  /// Changes on the addition of a new RowGroup.
   int64_t num_rows() const;
 
-  /**
-   * Number of started RowGroups.
-   */
+  /// Number of started RowGroups.
   int num_row_groups() const;
 
-  /**
-   * Configuration passed to the writer, e.g. the used Parquet format version.
-   */
+  /// Configuration passed to the writer, e.g. the used Parquet format version.
   const std::shared_ptr<WriterProperties>& properties() const;
 
-  /**
-    * Returns the file schema descriptor
-    */
+  /// Returns the file schema descriptor
   const SchemaDescriptor* schema() const;
 
-  /**
-   * Returns a column descriptor in schema
-   */
+  /// Returns a column descriptor in schema
   const ColumnDescriptor* descr(int i) const;
 
-  /**
-   * Returns the file custom metadata
-   */
+  /// Returns the file custom metadata
   const std::shared_ptr<const KeyValueMetadata>& key_value_metadata() const;
 
  private:
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
index 1521cbd..26d53d3 100644
--- a/src/parquet/statistics-test.cc
+++ b/src/parquet/statistics-test.cc
@@ -138,7 +138,7 @@
     std::shared_ptr<WriterProperties> writer_properties =
         WriterProperties::Builder().enable_statistics("column")->build();
     auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
-    auto row_group_writer = file_writer->AppendRowGroup(num_values);
+    auto row_group_writer = file_writer->AppendRowGroup();
     auto column_writer =
         static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
 
@@ -438,7 +438,7 @@
     auto file_writer = parquet::ParquetFileWriter::Open(parquet_sink_, schema_, props);
 
     // Append a RowGroup with a specific number of rows.
-    auto rg_writer = file_writer->AppendRowGroup(NUM_VALUES);
+    auto rg_writer = file_writer->AppendRowGroup();
 
     this->SetValues();