PARQUET-953: Add static constructors to arrow::FileWriter for initializing from schema, add WriteTable method
I preserved the existing WriteTable top level methods, but this will unblock ARROW-528
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes #296 from wesm/PARQUET-953 and squashes the following commits:
127edaa [Wes McKinney] Make FileWriter ctor public again
7c921f3 [Wes McKinney] cpplint
b825f0b [Wes McKinney] Add static constructors to arrow::FileWriter for initializing from arrow::Schema
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 2f8f421..0bdc14d 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -23,9 +23,9 @@
#include "parquet/api/writer.h"
#include "parquet/arrow/reader.h"
+#include "parquet/arrow/schema.h"
#include "parquet/arrow/test-util.h"
#include "parquet/arrow/writer.h"
-#include "parquet/arrow/schema.h"
#include "parquet/file/writer.h"
@@ -890,17 +890,17 @@
void InitReader(std::shared_ptr<FileReader>* out) {
std::shared_ptr<Buffer> buffer = nested_parquet_->GetBuffer();
std::unique_ptr<FileReader> reader;
- ASSERT_OK_NO_THROW(OpenFile(
- std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
- ::parquet::default_reader_properties(), nullptr, &reader));
+ ASSERT_OK_NO_THROW(
+ OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
+ ::parquet::default_reader_properties(), nullptr, &reader));
*out = std::move(reader);
}
void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
nested_parquet_ = std::make_shared<InMemoryOutputStream>();
- writer_ = parquet::ParquetFileWriter::Open(nested_parquet_,
- schema, default_writer_properties());
+ writer_ = parquet::ParquetFileWriter::Open(
+ nested_parquet_, schema, default_writer_properties());
row_group_writer_ = writer_->AppendRowGroup(num_rows);
}
@@ -920,14 +920,11 @@
// }
// required int32 leaf3;
+ parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED,
+ {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT32),
+ PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT32)}));
parquet_fields.push_back(
- GroupNode::Make("group1", Repetition::REQUIRED, {
- PrimitiveNode::Make(
- "leaf1", Repetition::REQUIRED, ParquetType::INT32),
- PrimitiveNode::Make(
- "leaf2", Repetition::REQUIRED, ParquetType::INT32)}));
- parquet_fields.push_back(PrimitiveNode::Make(
- "leaf3", Repetition::REQUIRED, ParquetType::INT32));
+ PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT32));
const int num_columns = 3;
auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields);
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 96de92e..85578ac 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -73,8 +73,8 @@
return FromParquetSchema(&descr_, &result_schema_);
}
- ::arrow::Status ConvertSchema(const std::vector<NodePtr>& nodes,
- const std::vector<int>& column_indices) {
+ ::arrow::Status ConvertSchema(
+ const std::vector<NodePtr>& nodes, const std::vector<int>& column_indices) {
NodePtr schema = GroupNode::Make("schema", Repetition::REPEATED, nodes);
descr_.Init(schema);
return FromParquetSchema(&descr_, column_indices, &result_schema_);
@@ -365,18 +365,14 @@
// }
// required int64 leaf3;
{
+ parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED,
+ {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::BOOLEAN),
+ PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT32)}));
parquet_fields.push_back(
- GroupNode::Make("group1", Repetition::REQUIRED, {
- PrimitiveNode::Make(
- "leaf1", Repetition::REQUIRED, ParquetType::BOOLEAN),
- PrimitiveNode::Make(
- "leaf2", Repetition::REQUIRED, ParquetType::INT32)}));
- parquet_fields.push_back(PrimitiveNode::Make(
- "leaf3", Repetition::REQUIRED, ParquetType::INT64));
+ PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64));
- auto group1_fields = {
- std::make_shared<Field>("leaf1", BOOL, false),
- std::make_shared<Field>("leaf2", INT32, false)};
+ auto group1_fields = {std::make_shared<Field>("leaf1", BOOL, false),
+ std::make_shared<Field>("leaf2", INT32, false)};
auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false));
arrow_fields.push_back(std::make_shared<Field>("leaf3", INT64, false));
@@ -412,20 +408,14 @@
// }
// required int64 leaf5;
{
+ parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED,
+ {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT64),
+ PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT64)}));
+ parquet_fields.push_back(GroupNode::Make("group2", Repetition::REQUIRED,
+ {PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64),
+ PrimitiveNode::Make("leaf4", Repetition::REQUIRED, ParquetType::INT64)}));
parquet_fields.push_back(
- GroupNode::Make("group1", Repetition::REQUIRED, {
- PrimitiveNode::Make(
- "leaf1", Repetition::REQUIRED, ParquetType::INT64),
- PrimitiveNode::Make(
- "leaf2", Repetition::REQUIRED, ParquetType::INT64)}));
- parquet_fields.push_back(
- GroupNode::Make("group2", Repetition::REQUIRED, {
- PrimitiveNode::Make(
- "leaf3", Repetition::REQUIRED, ParquetType::INT64),
- PrimitiveNode::Make(
- "leaf4", Repetition::REQUIRED, ParquetType::INT64)}));
- parquet_fields.push_back(PrimitiveNode::Make(
- "leaf5", Repetition::REQUIRED, ParquetType::INT64));
+ PrimitiveNode::Make("leaf5", Repetition::REQUIRED, ParquetType::INT64));
auto group1_fields = {std::make_shared<Field>("leaf1", INT64, false)};
auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
@@ -456,26 +446,24 @@
// }
parquet_fields.push_back(
PrimitiveNode::Make("leaf1", Repetition::OPTIONAL, ParquetType::INT32));
- parquet_fields.push_back(
- GroupNode::Make("outerGroup", Repetition::REPEATED, {
- PrimitiveNode::Make(
- "leaf2", Repetition::OPTIONAL, ParquetType::INT32),
- GroupNode::Make("innerGroup", Repetition::REPEATED, {
- PrimitiveNode::Make(
- "leaf3", Repetition::OPTIONAL, ParquetType::INT32)})}));
+ parquet_fields.push_back(GroupNode::Make("outerGroup", Repetition::REPEATED,
+ {PrimitiveNode::Make("leaf2", Repetition::OPTIONAL, ParquetType::INT32),
+ GroupNode::Make("innerGroup", Repetition::REPEATED,
+ {PrimitiveNode::Make(
+ "leaf3", Repetition::OPTIONAL, ParquetType::INT32)})}));
auto inner_group_fields = {std::make_shared<Field>("leaf3", INT32, true)};
auto inner_group_type = std::make_shared<::arrow::StructType>(inner_group_fields);
- auto outer_group_fields = {
- std::make_shared<Field>("leaf2", INT32, true),
- std::make_shared<Field>("innerGroup", ::arrow::list(
- std::make_shared<Field>("innerGroup", inner_group_type, false)), false)};
+ auto outer_group_fields = {std::make_shared<Field>("leaf2", INT32, true),
+ std::make_shared<Field>("innerGroup",
+ ::arrow::list(std::make_shared<Field>("innerGroup", inner_group_type, false)),
+ false)};
auto outer_group_type = std::make_shared<::arrow::StructType>(outer_group_fields);
arrow_fields.push_back(std::make_shared<Field>("leaf1", INT32, true));
- arrow_fields.push_back(
- std::make_shared<Field>("outerGroup", ::arrow::list(
- std::make_shared<Field>("outerGroup", outer_group_type, false)), false));
+ arrow_fields.push_back(std::make_shared<Field>("outerGroup",
+ ::arrow::list(std::make_shared<Field>("outerGroup", outer_group_type, false)),
+ false));
}
auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
ASSERT_OK(ConvertSchema(parquet_fields));
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index e589581..2c74839 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -18,8 +18,8 @@
#include "parquet/arrow/schema.h"
#include <string>
-#include <vector>
#include <unordered_set>
+#include <vector>
#include "parquet/api/schema.h"
@@ -192,11 +192,9 @@
* Auxilary function to test if a parquet schema node is a leaf node
* that should be included in a resulting arrow schema
*/
-inline bool IsIncludedLeaf(const NodePtr& node,
- const std::unordered_set<NodePtr>* included_leaf_nodes) {
- if (included_leaf_nodes == nullptr) {
- return true;
- }
+inline bool IsIncludedLeaf(
+ const NodePtr& node, const std::unordered_set<NodePtr>* included_leaf_nodes) {
+ if (included_leaf_nodes == nullptr) { return true; }
auto search = included_leaf_nodes->find(node);
return (search != included_leaf_nodes->end());
}
@@ -210,13 +208,9 @@
for (int i = 0; i < group->field_count(); i++) {
RETURN_NOT_OK(NodeToFieldInternal(group->field(i), included_leaf_nodes, &field));
- if (field != nullptr) {
- fields.push_back(field);
- }
+ if (field != nullptr) { fields.push_back(field); }
}
- if (fields.size() > 0) {
- *out = std::make_shared<::arrow::StructType>(fields);
- }
+ if (fields.size() > 0) { *out = std::make_shared<::arrow::StructType>(fields); }
return Status::OK();
}
@@ -240,12 +234,10 @@
!str_endswith_tuple(list_node->name())) {
// List of primitive type
std::shared_ptr<Field> item_field;
- RETURN_NOT_OK(NodeToFieldInternal(
- list_group->field(0), included_leaf_nodes, &item_field));
+ RETURN_NOT_OK(
+ NodeToFieldInternal(list_group->field(0), included_leaf_nodes, &item_field));
- if (item_field != nullptr) {
- *out = ::arrow::list(item_field);
- }
+ if (item_field != nullptr) { *out = ::arrow::list(item_field); }
} else {
// List of struct
std::shared_ptr<::arrow::DataType> inner_type;
@@ -260,7 +252,7 @@
std::shared_ptr<::arrow::DataType> inner_type;
if (IsIncludedLeaf(static_cast<NodePtr>(list_node), included_leaf_nodes)) {
const PrimitiveNode* primitive =
- static_cast<const PrimitiveNode*>(list_node.get());
+ static_cast<const PrimitiveNode*>(list_node.get());
RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
*out = ::arrow::list(item_field);
@@ -282,7 +274,6 @@
Status NodeToFieldInternal(const NodePtr& node,
const std::unordered_set<NodePtr>* included_leaf_nodes, std::shared_ptr<Field>* out) {
-
std::shared_ptr<::arrow::DataType> type = nullptr;
bool nullable = !node->is_required();
@@ -317,9 +308,7 @@
RETURN_NOT_OK(FromPrimitive(primitive, &type));
}
}
- if (type != nullptr) {
- *out = std::make_shared<Field>(node->name(), type, nullable);
- }
+ if (type != nullptr) { *out = std::make_shared<Field>(node->name(), type, nullable); }
return Status::OK();
}
@@ -354,11 +343,9 @@
std::vector<std::shared_ptr<Field>> fields;
std::shared_ptr<Field> field;
for (int i = 0; i < schema_node->field_count(); i++) {
- RETURN_NOT_OK(NodeToFieldInternal(
- schema_node->field(i), &included_leaf_nodes, &field));
- if (field != nullptr) {
- fields.push_back(field);
- }
+ RETURN_NOT_OK(
+ NodeToFieldInternal(schema_node->field(i), &included_leaf_nodes, &field));
+ if (field != nullptr) { fields.push_back(field); }
}
*out = std::make_shared<::arrow::Schema>(fields);
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 5933937..90cd135 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -496,9 +496,6 @@
return Status::OK();
}
-FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
- : impl_(new FileWriter::Impl(pool, std::move(writer))) {}
-
Status FileWriter::NewRowGroup(int64_t chunk_size) {
return impl_->NewRowGroup(chunk_size);
}
@@ -589,16 +586,33 @@
FileWriter::~FileWriter() {}
-Status WriteTable(const Table& table, MemoryPool* pool,
- const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
- const std::shared_ptr<WriterProperties>& properties) {
- std::shared_ptr<SchemaDescriptor> parquet_schema;
- RETURN_NOT_OK(ToParquetSchema(table.schema().get(), *properties, &parquet_schema));
- auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
- std::unique_ptr<ParquetFileWriter> parquet_writer =
- ParquetFileWriter::Open(sink, schema_node, properties);
- FileWriter writer(pool, std::move(parquet_writer));
+FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
+ : impl_(new FileWriter::Impl(pool, std::move(writer))) {}
+Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+ const std::shared_ptr<OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ std::unique_ptr<FileWriter>* writer) {
+ std::shared_ptr<SchemaDescriptor> parquet_schema;
+ RETURN_NOT_OK(ToParquetSchema(&schema, *properties, &parquet_schema));
+
+ auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+ std::unique_ptr<ParquetFileWriter> base_writer =
+ ParquetFileWriter::Open(sink, schema_node, properties);
+
+ writer->reset(new FileWriter(pool, std::move(base_writer)));
+ return Status::OK();
+}
+
+Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+ const std::shared_ptr<::arrow::io::OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ std::unique_ptr<FileWriter>* writer) {
+ auto wrapper = std::make_shared<ArrowOutputStream>(sink);
+ return Open(schema, pool, wrapper, properties, writer);
+}
+
+Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
// TODO(ARROW-232) Support writing chunked arrays.
for (int i = 0; i < table.num_columns(); i++) {
if (table.column(i)->data()->num_chunks() != 1) {
@@ -609,19 +623,26 @@
for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
int64_t offset = chunk * chunk_size;
int64_t size = std::min(chunk_size, table.num_rows() - offset);
- RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close()));
+ RETURN_NOT_OK_ELSE(NewRowGroup(size), PARQUET_IGNORE_NOT_OK(Close()));
for (int i = 0; i < table.num_columns(); i++) {
std::shared_ptr<Array> array = table.column(i)->data()->chunk(0);
array = array->Slice(offset, size);
- RETURN_NOT_OK_ELSE(
- writer.WriteColumnChunk(*array), PARQUET_IGNORE_NOT_OK(writer.Close()));
+ RETURN_NOT_OK_ELSE(WriteColumnChunk(*array), PARQUET_IGNORE_NOT_OK(Close()));
}
}
-
- return writer.Close();
+ return Status::OK();
}
-Status WriteTable(const Table& table, MemoryPool* pool,
+Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
+ const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
+ const std::shared_ptr<WriterProperties>& properties) {
+ std::unique_ptr<FileWriter> writer;
+ RETURN_NOT_OK(FileWriter::Open(*table.schema(), pool, sink, properties, &writer));
+ RETURN_NOT_OK(writer->WriteTable(table, chunk_size));
+ return writer->Close();
+}
+
+Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
const std::shared_ptr<WriterProperties>& properties) {
auto wrapper = std::make_shared<ArrowOutputStream>(sink);
@@ -629,5 +650,4 @@
}
} // namespace arrow
-
} // namespace parquet
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
index e3b281b..3916298 100644
--- a/src/parquet/arrow/writer.h
+++ b/src/parquet/arrow/writer.h
@@ -31,13 +31,13 @@
class MemoryPool;
class PrimitiveArray;
class RowBatch;
+class Schema;
class Status;
class StringArray;
class Table;
-}
+} // namespace arrow
namespace parquet {
-
namespace arrow {
/**
@@ -49,6 +49,23 @@
public:
FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
+ static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+ const std::shared_ptr<OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ std::unique_ptr<FileWriter>* writer);
+
+ static ::arrow::Status Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
+ const std::shared_ptr<::arrow::io::OutputStream>& sink,
+ const std::shared_ptr<WriterProperties>& properties,
+ std::unique_ptr<FileWriter>* writer);
+
+ /**
+ * Write a Table to Parquet.
+ *
+ * The table shall only consist of columns of primitive type or of primitive lists.
+ */
+ ::arrow::Status WriteTable(const ::arrow::Table& table, int64_t chunk_size);
+
::arrow::Status NewRowGroup(int64_t chunk_size);
::arrow::Status WriteColumnChunk(const ::arrow::Array& data);
::arrow::Status Close();
@@ -78,7 +95,6 @@
const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
} // namespace arrow
-
} // namespace parquet
#endif // PARQUET_ARROW_WRITER_H