PARQUET-1378: Allow RowGroups with zero rows to be written

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes #485 from majetideepak/PARQUET-1378 and squashes the following commits:

a4db300 [Deepak Majeti] Parquet-1378: Allow RowGroups with zero rows to be written
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index 48fba55..934530c 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -432,12 +432,13 @@
     FlushBufferedDataPages();
 
     EncodedStatistics chunk_statistics = GetChunkStatistics();
+    // Write stats only if the column has atleast one row written
     // From parquet-mr
     // Don't write stats larger than the max size rather than truncating. The
     // rationale is that some engines may use the minimum value in the page as
     // the true minimum for aggregations and there is no way to mark that a
     // value has been truncated and is a lower bound and not in the page.
-    if (chunk_statistics.is_set() &&
+    if (rows_written_ > 0 && chunk_statistics.is_set() &&
         chunk_statistics.max_stat_length() <=
             properties_->max_statistics_size(descr_->path())) {
       metadata_->SetStatistics(SortOrder::SIGNED == descr_->sort_order(),
diff --git a/src/parquet/file-serialize-test.cc b/src/parquet/file-serialize-test.cc
index 31d2bd4..1993404 100644
--- a/src/parquet/file-serialize-test.cc
+++ b/src/parquet/file-serialize-test.cc
@@ -176,6 +176,27 @@
       column_writer->Close();
     }
   }
+
+  void ZeroRowsRowGroup() {
+    std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
+    auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
+
+    std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
+
+    auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
+
+    RowGroupWriter* row_group_writer;
+    row_group_writer = file_writer->AppendRowGroup();
+
+    for (int col = 0; col < num_columns_; ++col) {
+      auto column_writer =
+          static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+      column_writer->Close();
+    }
+
+    row_group_writer->Close();
+    file_writer->Close();
+  }
 };
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
@@ -198,6 +219,8 @@
   ASSERT_THROW(this->UnequalNumRows(101, num_rows), ParquetException);
 }
 
+TYPED_TEST(TestSerialize, ZeroRows) { ASSERT_NO_THROW(this->ZeroRowsRowGroup()); }
+
 TYPED_TEST(TestSerialize, RepeatedTooFewRows) {
   ASSERT_THROW(this->RepeatedUnequalRows(), ParquetException);
 }
diff --git a/src/parquet/metadata.cc b/src/parquet/metadata.cc
index 39dee63..1cab51f 100644
--- a/src/parquet/metadata.cc
+++ b/src/parquet/metadata.cc
@@ -731,7 +731,7 @@
     int64_t total_byte_size = 0;
 
     for (int i = 0; i < schema_->num_columns(); i++) {
-      if (!(row_group_->columns[i].file_offset > 0)) {
+      if (!(row_group_->columns[i].file_offset >= 0)) {
         std::stringstream ss;
         ss << "Column " << i << " is not complete.";
         throw ParquetException(ss.str());