PARQUET-918: Keep ordering in column indices when converting Parquet Schema
This is a follow up fix for [PARQUET-918](https://github.com/apache/parquet-cpp/pull/295), do I need to create another jira for this?
Looks like some .idea files are included by accident. It looks no harm. Do I need to revert them?@wesm
cc @wesm @itaiin for reviewing
Author: Xianjin YE <advancedxy@gmail.com>
Closes #297 from advancedxy/master and squashes the following commits:
e606d9d [Xianjin YE] Add .idea/ to .gitignore and make style check happy.
1adb192 [Xianjin YE] Add API doc for FromParquetSchema(parquet_schema, column_indices, out)
8de263b [Xianjin YE] Keep ordering in column indices when converting Parquet Schema to Arrow Schema
diff --git a/.gitignore b/.gitignore
index aeb80e1..9de56ea 100644
--- a/.gitignore
+++ b/.gitignore
@@ -13,3 +13,4 @@
thirdparty
*.pc
+.idea/
\ No newline at end of file
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index 85578ac..0f6b455 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -62,8 +62,8 @@
for (int i = 0; i < expected_schema->num_fields(); ++i) {
auto lhs = result_schema_->field(i);
auto rhs = expected_schema->field(i);
- EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString()
- << " != " << rhs->ToString();
+ EXPECT_TRUE(lhs->Equals(rhs))
+ << i << " " << lhs->ToString() << " != " << rhs->ToString();
}
}
@@ -433,6 +433,54 @@
CheckFlatSchema(arrow_schema);
}
+TEST_F(TestConvertParquetSchema, ParquetNestedSchemaPartialOrdering) {
+ std::vector<NodePtr> parquet_fields;
+ std::vector<std::shared_ptr<Field>> arrow_fields;
+
+ // Full Parquet Schema:
+ // required group group1 {
+ // required int64 leaf1;
+ // required int64 leaf2;
+ // }
+ // required group group2 {
+ // required int64 leaf3;
+ // required int64 leaf4;
+ // }
+ // required int64 leaf5;
+ //
+ // Expected partial arrow schema (columns 3, 4, 0):
+ // required group group2 {
+ // required int64 leaf4;
+ // }
+ // required int64 leaf5;
+ // required group group1 {
+ // required int64 leaf1;
+ // }
+ {
+ parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED,
+ {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT64),
+ PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT64)}));
+ parquet_fields.push_back(GroupNode::Make("group2", Repetition::REQUIRED,
+ {PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT64),
+ PrimitiveNode::Make("leaf4", Repetition::REQUIRED, ParquetType::INT64)}));
+ parquet_fields.push_back(
+ PrimitiveNode::Make("leaf5", Repetition::REQUIRED, ParquetType::INT64));
+
+ auto group1_fields = {std::make_shared<Field>("leaf1", INT64, false)};
+ auto arrow_group1_type = std::make_shared<::arrow::StructType>(group1_fields);
+ auto group2_fields = {std::make_shared<Field>("leaf4", INT64, false)};
+ auto arrow_group2_type = std::make_shared<::arrow::StructType>(group2_fields);
+
+ arrow_fields.push_back(std::make_shared<Field>("group2", arrow_group2_type, false));
+ arrow_fields.push_back(std::make_shared<Field>("leaf5", INT64, false));
+ arrow_fields.push_back(std::make_shared<Field>("group1", arrow_group1_type, false));
+ }
+
+ auto arrow_schema = std::make_shared<::arrow::Schema>(arrow_fields);
+ ASSERT_OK(ConvertSchema(parquet_fields, {3, 4, 0}));
+
+ CheckFlatSchema(arrow_schema);
+}
TEST_F(TestConvertParquetSchema, ParquetRepeatedNestedSchema) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 2c74839..25713a7 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -330,21 +330,26 @@
const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema>* out) {
// TODO(wesm): Consider adding an arrow::Schema name attribute, which comes
// from the root Parquet node
- const GroupNode* schema_node = parquet_schema->group_node();
// Put the right leaf nodes in an unordered set
+ // Index in column_indices should be unique, duplicate indices are merged into one and
+ // ordering by its first appearing.
int num_columns = static_cast<int>(column_indices.size());
+ std::unordered_set<NodePtr> top_nodes; // to deduplicate the top nodes
+ std::vector<NodePtr> base_nodes; // to keep the ordering
std::unordered_set<NodePtr> included_leaf_nodes(num_columns);
for (int i = 0; i < num_columns; i++) {
auto column_desc = parquet_schema->Column(column_indices[i]);
included_leaf_nodes.insert(column_desc->schema_node());
+ auto column_root = parquet_schema->GetColumnRoot(column_indices[i]);
+ auto insertion = top_nodes.insert(column_root);
+ if (insertion.second) { base_nodes.push_back(column_root); }
}
std::vector<std::shared_ptr<Field>> fields;
std::shared_ptr<Field> field;
- for (int i = 0; i < schema_node->field_count(); i++) {
- RETURN_NOT_OK(
- NodeToFieldInternal(schema_node->field(i), &included_leaf_nodes, &field));
+ for (auto node : base_nodes) {
+ RETURN_NOT_OK(NodeToFieldInternal(node, &included_leaf_nodes, &field));
if (field != nullptr) { fields.push_back(field); }
}
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
index b93f088..1866fea 100644
--- a/src/parquet/arrow/schema.h
+++ b/src/parquet/arrow/schema.h
@@ -39,6 +39,13 @@
::arrow::Status PARQUET_EXPORT NodeToField(
const schema::NodePtr& node, std::shared_ptr<::arrow::Field>* out);
+/// Convert parquet schema to arrow schema with selected indices
+/// \param parquet_schema to be converted
+/// \param column_indices indices of leaf nodes in parquet schema tree. Appearing ordering
+/// matters for the converted schema. Repeated indices are ignored
+/// except for the first one
+/// \param out the corresponding arrow schema
+/// \return Status::OK() on a successful conversion.
::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema,
const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema>* out);