PARQUET-953: Add static constructors to arrow::FileWriter for initializing from schema, add WriteTable method

I preserved the existing WriteTable top level methods, but this will unblock ARROW-528

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #296 from wesm/PARQUET-953 and squashes the following commits:

127edaa [Wes McKinney] Make FileWriter ctor public again
7c921f3 [Wes McKinney] cpplint
b825f0b [Wes McKinney] Add static constructors to arrow::FileWriter for initializing from arrow::Schema
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 2f8f421..0bdc14d 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -23,9 +23,9 @@
 #include "parquet/api/writer.h"
 
 #include "parquet/arrow/reader.h"
+#include "parquet/arrow/schema.h"
 #include "parquet/arrow/test-util.h"
 #include "parquet/arrow/writer.h"
-#include "parquet/arrow/schema.h"
 
 #include "parquet/file/writer.h"
 
@@ -890,17 +890,17 @@
   void InitReader(std::shared_ptr<FileReader>* out) {
     std::shared_ptr<Buffer> buffer = nested_parquet_->GetBuffer();
     std::unique_ptr<FileReader> reader;
-    ASSERT_OK_NO_THROW(OpenFile(
-      std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
-      ::parquet::default_reader_properties(), nullptr, &reader));
+    ASSERT_OK_NO_THROW(
+        OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
+            ::parquet::default_reader_properties(), nullptr, &reader));
 
     *out = std::move(reader);
   }
 
   void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
     nested_parquet_ = std::make_shared<InMemoryOutputStream>();
-    writer_ = parquet::ParquetFileWriter::Open(nested_parquet_,
-       schema, default_writer_properties());
+    writer_ = parquet::ParquetFileWriter::Open(
+        nested_parquet_, schema, default_writer_properties());
     row_group_writer_ = writer_->AppendRowGroup(num_rows);
   }
 
@@ -920,14 +920,11 @@
     // }
     // required int32 leaf3;
 
+    parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED,
+        {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT32),
+            PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT32)}));
     parquet_fields.push_back(
-        GroupNode::Make("group1", Repetition::REQUIRED, {
-          PrimitiveNode::Make(
-            "leaf1", Repetition::REQUIRED, ParquetType::INT32),
-          PrimitiveNode::Make(
-            "leaf2", Repetition::REQUIRED, ParquetType::INT32)}));
-    parquet_fields.push_back(PrimitiveNode::Make(
-        "leaf3", Repetition::REQUIRED, ParquetType::INT32));
+        PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT32));
 
     const int num_columns = 3;
     auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields);
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 96de92e..85578ac 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -73,8 +73,8 @@
     return FromParquetSchema(&descr_, &result_schema_);
   }
 
-  ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes,
-        const std::vector<int>& column_indices) {
+  ::arrow::Status ConvertSchema(
+      const std::vector<NodePtr>& nodes, const std::vector<int>& column_indices) {
     NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
     descr_.Init(schema);
     return FromParquetSchema(&descr_, column_indices, &result_schema_);
@@ -365,18 +365,14 @@
   // }
   // required int64 leaf3;
   {
+    parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED,
+        {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::BOOLEAN),
+            PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT32)}));
     parquet_fields.push_back(
-        GroupNode::Make("group1", Repetition::REQUIRED, {
-          PrimitiveNode::Make(
-            "leaf1", Repetition::REQUIRED, ParquetType::BOOLEAN),
-          PrimitiveNode::Make(
-            "leaf2", Repetition::REQUIRED, ParquetType::INT32)}));
-    parquet_fields.push_back(PrimitiveNode::Make(
-        "leaf3", Repetition::REQUIRED, ParquetType::INT64));
+        PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64));
 
-    auto group1_fields = {
-      std::make_shared<Field>("leaf1", BOOL, false),
-      std::make_shared<Field>("leaf2", INT32, false)};
+    auto group1_fields = {std::make_shared<Field>("leaf1", BOOL, false),
+        std::make_shared<Field>("leaf2", INT32, false)};
     auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
     arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false));
     arrow_fields.push_back(std::make_shared<Field>("leaf3", INT64, false));
@@ -412,20 +408,14 @@
   // }
   // required int64 leaf5;
   {
+    parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED,
+        {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT64),
+            PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT64)}));
+    parquet_fields.push_back(GroupNode::Make("group2", Repetition::REQUIRED,
+        {PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64),
+            PrimitiveNode::Make("leaf4", Repetition::REQUIRED, ParquetType::INT64)}));
     parquet_fields.push_back(
-        GroupNode::Make("group1", Repetition::REQUIRED, {
-          PrimitiveNode::Make(
-            "leaf1", Repetition::REQUIRED, ParquetType::INT64),
-          PrimitiveNode::Make(
-            "leaf2", Repetition::REQUIRED, ParquetType::INT64)}));
-    parquet_fields.push_back(
-        GroupNode::Make("group2", Repetition::REQUIRED, {
-          PrimitiveNode::Make(
-            "leaf3", Repetition::REQUIRED, ParquetType::INT64),
-          PrimitiveNode::Make(
-            "leaf4", Repetition::REQUIRED, ParquetType::INT64)}));
-    parquet_fields.push_back(PrimitiveNode::Make(
-        "leaf5", Repetition::REQUIRED, ParquetType::INT64));
+        PrimitiveNode::Make("leaf5", Repetition::REQUIRED, ParquetType::INT64));
 
     auto group1_fields = {std::make_shared<Field>("leaf1", INT64, false)};
     auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
@@ -456,26 +446,24 @@
     //   }
     parquet_fields.push_back(
         PrimitiveNode::Make("leaf1", Repetition::OPTIONAL, ParquetType::INT32));
-    parquet_fields.push_back(
-      GroupNode::Make("outerGroup", Repetition::REPEATED, {
-        PrimitiveNode::Make(
-          "leaf2", Repetition::OPTIONAL, ParquetType::INT32),
-        GroupNode::Make("innerGroup", Repetition::REPEATED, {
-          PrimitiveNode::Make(
-            "leaf3", Repetition::OPTIONAL, ParquetType::INT32)})}));
+    parquet_fields.push_back(GroupNode::Make("outerGroup", Repetition::REPEATED,
+        {PrimitiveNode::Make("leaf2", Repetition::OPTIONAL, ParquetType::INT32),
+            GroupNode::Make("innerGroup", Repetition::REPEATED,
+                {PrimitiveNode::Make(
+                    "leaf3", Repetition::OPTIONAL, ParquetType::INT32)})}));
 
     auto inner_group_fields = {std::make_shared<Field>("leaf3", INT32, true)};
     auto inner_group_type = std::make_shared<::arrow::StructType>(inner_group_fields);
-    auto outer_group_fields = {
-        std::make_shared<Field>("leaf2", INT32, true),
-        std::make_shared<Field>("innerGroup", ::arrow::list(
-            std::make_shared<Field>("innerGroup", inner_group_type, false)), false)};
+    auto outer_group_fields = {std::make_shared<Field>("leaf2", INT32, true),
+        std::make_shared<Field>("innerGroup",
+            ::arrow::list(std::make_shared<Field>("innerGroup", inner_group_type, false)),
+            false)};
     auto outer_group_type = std::make_shared<::arrow::StructType>(outer_group_fields);
 
     arrow_fields.push_back(std::make_shared<Field>("leaf1", INT32, true));
-    arrow_fields.push_back(
-        std::make_shared<Field>("outerGroup", ::arrow::list(
-            std::make_shared<Field>("outerGroup", outer_group_type, false)), false));
+    arrow_fields.push_back(std::make_shared<Field>("outerGroup",
+        ::arrow::list(std::make_shared<Field>("outerGroup", outer_group_type, false)),
+        false));
   }
   auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
   ASSERT_OK(ConvertSchema(parquet_fields));
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index e589581..2c74839 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -18,8 +18,8 @@
 #include "parquet/arrow/schema.h"
 
 #include <string>
-#include <vector>
 #include <unordered_set>
+#include <vector>
 
 #include "parquet/api/schema.h"
 
@@ -192,11 +192,9 @@
  * Auxilary function to test if a parquet schema node is a leaf node
  * that should be included in a resulting arrow schema
  */
-inline bool IsIncludedLeaf(const NodePtr& node,
-    const std::unordered_set<NodePtr>* included_leaf_nodes) {
-  if (included_leaf_nodes == nullptr) {
-    return true;
-  }
+inline bool IsIncludedLeaf(
+    const NodePtr& node, const std::unordered_set<NodePtr>* included_leaf_nodes) {
+  if (included_leaf_nodes == nullptr) { return true; }
   auto search = included_leaf_nodes->find(node);
   return (search != included_leaf_nodes->end());
 }
@@ -210,13 +208,9 @@
 
   for (int i = 0; i < group->field_count(); i++) {
     RETURN_NOT_OK(NodeToFieldInternal(group->field(i), included_leaf_nodes, &field));
-    if (field != nullptr) {
-      fields.push_back(field);
-    }
+    if (field != nullptr) { fields.push_back(field); }
   }
-  if (fields.size() > 0) {
-    *out = std::make_shared<::arrow::StructType>(fields);
-  }
+  if (fields.size() > 0) { *out = std::make_shared<::arrow::StructType>(fields); }
   return Status::OK();
 }
 
@@ -240,12 +234,10 @@
           !str_endswith_tuple(list_node->name())) {
         // List of primitive type
         std::shared_ptr<Field> item_field;
-        RETURN_NOT_OK(NodeToFieldInternal(
-          list_group->field(0), included_leaf_nodes, &item_field));
+        RETURN_NOT_OK(
+            NodeToFieldInternal(list_group->field(0), included_leaf_nodes, &item_field));
 
-        if (item_field != nullptr) {
-          *out = ::arrow::list(item_field);
-        }
+        if (item_field != nullptr) { *out = ::arrow::list(item_field); }
       } else {
         // List of struct
         std::shared_ptr<::arrow::DataType> inner_type;
@@ -260,7 +252,7 @@
       std::shared_ptr<::arrow::DataType> inner_type;
       if (IsIncludedLeaf(static_cast<NodePtr>(list_node), included_leaf_nodes)) {
         const PrimitiveNode* primitive =
-          static_cast<const PrimitiveNode*>(list_node.get());
+            static_cast<const PrimitiveNode*>(list_node.get());
         RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
         auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
         *out = ::arrow::list(item_field);
@@ -282,7 +274,6 @@
 
 Status NodeToFieldInternal(const NodePtr& node,
     const std::unordered_set<NodePtr>* included_leaf_nodes, std::shared_ptr<Field>* out) {
-
   std::shared_ptr<::arrow::DataType> type = nullptr;
   bool nullable = !node->is_required();
 
@@ -317,9 +308,7 @@
       RETURN_NOT_OK(FromPrimitive(primitive, &type));
     }
   }
-  if (type != nullptr) {
-    *out = std::make_shared<Field>(node->name(), type, nullable);
-  }
+  if (type != nullptr) { *out = std::make_shared<Field>(node->name(), type, nullable); }
   return Status::OK();
 }
 
@@ -354,11 +343,9 @@
   std::vector<std::shared_ptr<Field>> fields;
   std::shared_ptr<Field> field;
   for (int i = 0; i < schema_node->field_count(); i++) {
-    RETURN_NOT_OK(NodeToFieldInternal(
-      schema_node->field(i), &included_leaf_nodes, &field));
-    if (field != nullptr) {
-      fields.push_back(field);
-    }
+    RETURN_NOT_OK(
+        NodeToFieldInternal(schema_node->field(i), &included_leaf_nodes, &field));
+    if (field != nullptr) { fields.push_back(field); }
   }
 
   *out = std::make_shared<::arrow::Schema>(fields);
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 5933937..90cd135 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -496,9 +496,6 @@
   return Status::OK();
 }
 
-FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
-    : impl_(new FileWriter::Impl(pool, std::move(writer))) {}
-
 Status FileWriter::NewRowGroup(int64_t chunk_size) {
   return impl_->NewRowGroup(chunk_size);
 }
@@ -589,16 +586,33 @@
 
 FileWriter::~FileWriter() {}
 
-Status WriteTable(const Table& table, MemoryPool* pool,
-    const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
-    const std::shared_ptr<WriterProperties>& properties) {
-  std::shared_ptr<SchemaDescriptor> parquet_schema;
-  RETURN_NOT_OK(ToParquetSchema(table.schema().get(), *properties, &parquet_schema));
-  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
-  std::unique_ptr<ParquetFileWriter> parquet_writer =
-      ParquetFileWriter::Open(sink, schema_node, properties);
-  FileWriter writer(pool, std::move(parquet_writer));
+FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
+    : impl_(new FileWriter::Impl(pool, std::move(writer))) {}
 
+Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+    const std::shared_ptr<OutputStream>& sink,
+    const std::shared_ptr<WriterProperties>& properties,
+    std::unique_ptr<FileWriter>* writer) {
+  std::shared_ptr<SchemaDescriptor> parquet_schema;
+  RETURN_NOT_OK(ToParquetSchema(&schema, *properties, &parquet_schema));
+
+  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+  std::unique_ptr<ParquetFileWriter> base_writer =
+      ParquetFileWriter::Open(sink, schema_node, properties);
+
+  writer->reset(new FileWriter(pool, std::move(base_writer)));
+  return Status::OK();
+}
+
+Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+    const std::shared_ptr<::arrow::io::OutputStream>& sink,
+    const std::shared_ptr<WriterProperties>& properties,
+    std::unique_ptr<FileWriter>* writer) {
+  auto wrapper = std::make_shared<ArrowOutputStream>(sink);
+  return Open(schema, pool, wrapper, properties, writer);
+}
+
+Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
   // TODO(ARROW-232) Support writing chunked arrays.
   for (int i = 0; i < table.num_columns(); i++) {
     if (table.column(i)->data()->num_chunks() != 1) {
@@ -609,19 +623,26 @@
   for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
     int64_t offset = chunk * chunk_size;
     int64_t size = std::min(chunk_size, table.num_rows() - offset);
-    RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close()));
+    RETURN_NOT_OK_ELSE(NewRowGroup(size), PARQUET_IGNORE_NOT_OK(Close()));
     for (int i = 0; i < table.num_columns(); i++) {
       std::shared_ptr<Array> array = table.column(i)->data()->chunk(0);
       array = array->Slice(offset, size);
-      RETURN_NOT_OK_ELSE(
-          writer.WriteColumnChunk(*array), PARQUET_IGNORE_NOT_OK(writer.Close()));
+      RETURN_NOT_OK_ELSE(WriteColumnChunk(*array), PARQUET_IGNORE_NOT_OK(Close()));
     }
   }
-
-  return writer.Close();
+  return Status::OK();
 }
 
-Status WriteTable(const Table& table, MemoryPool* pool,
+Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
+    const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
+    const std::shared_ptr<WriterProperties>& properties) {
+  std::unique_ptr<FileWriter> writer;
+  RETURN_NOT_OK(FileWriter::Open(*table.schema(), pool, sink, properties, &writer));
+  RETURN_NOT_OK(writer->WriteTable(table, chunk_size));
+  return writer->Close();
+}
+
+Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
     const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
     const std::shared_ptr<WriterProperties>& properties) {
   auto wrapper = std::make_shared<ArrowOutputStream>(sink);
@@ -629,5 +650,4 @@
 }
 
 }  // namespace arrow
-
 }  // namespace parquet
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
index e3b281b..3916298 100644
--- a/src/parquet/arrow/writer.h
+++ b/src/parquet/arrow/writer.h
@@ -31,13 +31,13 @@
 class MemoryPool;
 class PrimitiveArray;
 class RowBatch;
+class Schema;
 class Status;
 class StringArray;
 class Table;
-}
+}  // namespace arrow
 
 namespace parquet {
-
 namespace arrow {
 
 /**
@@ -49,6 +49,23 @@
  public:
   FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
 
+  static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+      const std::shared_ptr<OutputStream>& sink,
+      const std::shared_ptr<WriterProperties>& properties,
+      std::unique_ptr<FileWriter>* writer);
+
+  static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+      const std::shared_ptr<::arrow::io::OutputStream>& sink,
+      const std::shared_ptr<WriterProperties>& properties,
+      std::unique_ptr<FileWriter>* writer);
+
+  /**
+   * Write a Table to Parquet.
+   *
+   * The table shall only consist of columns of primitive type or of primitive lists.
+   */
+  ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size);
+
   ::arrow::Status NewRowGroup(int64_t chunk_size);
   ::arrow::Status WriteColumnChunk(const ::arrow::Array& data);
   ::arrow::Status Close();
@@ -78,7 +95,6 @@
     const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
 
 }  // namespace arrow
-
 }  // namespace parquet
 
 #endif  // PARQUET_ARROW_WRITER_H