PARQUET-1245: Fix creating Arrow table with duplicate column names
Author: Antoine Pitrou <antoine@python.org>
Closes #447 from pitrou/ARROW-1974-duplicate-column-name and squashes the following commits:
a764a3c [Antoine Pitrou] Pass ColumnIndex() / FieldIndex() node argument by reference
4398d58 [Antoine Pitrou] ARROW-1974: Fix creating Arrow schema with duplicate column names
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 72e65d4..f06f4a8 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1669,6 +1669,27 @@
}
}
+TEST(TestArrowReadWrite, TableWithDuplicateColumns) {
+ // See ARROW-1974
+ using ::arrow::ArrayFromVector;
+
+ auto f0 = field("duplicate", ::arrow::int8());
+ auto f1 = field("duplicate", ::arrow::int16());
+ auto schema = ::arrow::schema({f0, f1});
+
+ std::vector<int8_t> a0_values = {1, 2, 3};
+ std::vector<int16_t> a1_values = {14, 15, 16};
+
+ std::shared_ptr<Array> a0, a1;
+
+ ArrayFromVector<::arrow::Int8Type, int8_t>(a0_values, &a0);
+ ArrayFromVector<::arrow::Int16Type, int16_t>(a1_values, &a1);
+
+ auto table = Table::Make(schema, {std::make_shared<Column>(f0->name(), a0),
+ std::make_shared<Column>(f1->name(), a1)});
+ CheckSimpleRoundtrip(table, table->num_rows());
+}
+
TEST(TestArrowWrite, CheckChunkSize) {
const int num_columns = 2;
const int num_rows = 128;
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index d502d24..da6af52 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -165,6 +165,31 @@
CheckFlatSchema(arrow_schema);
}
+TEST_F(TestConvertParquetSchema, DuplicateFieldNames) {
+ std::vector<NodePtr> parquet_fields;
+ std::vector<std::shared_ptr<Field>> arrow_fields;
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("xxx", Repetition::REQUIRED, ParquetType::BOOLEAN));
+ auto arrow_field1 = std::make_shared<Field>("xxx", BOOL, false);
+
+ parquet_fields.push_back(
+ PrimitiveNode::Make("xxx", Repetition::REQUIRED, ParquetType::INT32));
+ auto arrow_field2 = std::make_shared<Field>("xxx", INT32, false);
+
+ ASSERT_OK(ConvertSchema(parquet_fields));
+ arrow_fields = {arrow_field1, arrow_field2};
+ CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields));
+
+ ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>({0, 1})));
+ arrow_fields = {arrow_field1, arrow_field2};
+ CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields));
+
+ ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>({1, 0})));
+ arrow_fields = {arrow_field2, arrow_field1};
+ CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields));
+}
+
TEST_F(TestConvertParquetSchema, ParquetKeyValueMetadata) {
std::vector<NodePtr> parquet_fields;
std::vector<std::shared_ptr<Field>> arrow_fields;
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index bd68ec3..78c3225 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -443,7 +443,7 @@
}
Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
- std::shared_ptr<Table>* table) {
+ std::shared_ptr<Table>* out) {
std::shared_ptr<::arrow::Schema> schema;
RETURN_NOT_OK(GetSchema(indices, &schema));
@@ -473,7 +473,9 @@
RETURN_NOT_OK(ParallelFor(nthreads, num_fields, ReadColumnFunc));
}
- *table = Table::Make(schema, columns);
+ std::shared_ptr<Table> table = Table::Make(schema, columns);
+ RETURN_NOT_OK(table->Validate());
+ *out = table;
return Status::OK();
}
diff --git a/src/parquet/schema-test.cc b/src/parquet/schema-test.cc
index c8cce9f..ec9aff4 100644
--- a/src/parquet/schema-test.cc
+++ b/src/parquet/schema-test.cc
@@ -292,6 +292,17 @@
return fields;
}
+
+ NodeVector Fields2() {
+ // Fields with a duplicate name
+ NodeVector fields;
+
+ fields.push_back(Int32("duplicate", Repetition::REQUIRED));
+ fields.push_back(Int64("unique"));
+ fields.push_back(Double("duplicate"));
+
+ return fields;
+ }
};
TEST_F(TestGroupNode, Attrs) {
@@ -346,14 +357,23 @@
GroupNode group("group", Repetition::REQUIRED, fields);
for (size_t i = 0; i < fields.size(); i++) {
auto field = group.field(static_cast<int>(i));
- ASSERT_EQ(i, group.FieldIndex(*field.get()));
+ ASSERT_EQ(i, group.FieldIndex(*field));
}
// Test a non field node
auto non_field_alien = Int32("alien", Repetition::REQUIRED); // other name
auto non_field_familiar = Int32("one", Repetition::REPEATED); // other node
- ASSERT_TRUE(group.FieldIndex(*non_field_alien.get()) < 0);
- ASSERT_TRUE(group.FieldIndex(*non_field_familiar.get()) < 0);
+ ASSERT_TRUE(group.FieldIndex(*non_field_alien) < 0);
+ ASSERT_TRUE(group.FieldIndex(*non_field_familiar) < 0);
+}
+
+TEST_F(TestGroupNode, FieldIndexDuplicateName) {
+ NodeVector fields = Fields2();
+ GroupNode group("group", Repetition::REQUIRED, fields);
+ for (size_t i = 0; i < fields.size(); i++) {
+ auto field = group.field(static_cast<int>(i));
+ ASSERT_EQ(i, group.FieldIndex(*field));
+ }
}
// ----------------------------------------------------------------------
@@ -677,14 +697,14 @@
for (int i = 0; i < nleaves; ++i) {
auto col = descr_.Column(i);
- ASSERT_EQ(i, descr_.ColumnIndex(*col->schema_node().get()));
+ ASSERT_EQ(i, descr_.ColumnIndex(*col->schema_node()));
}
// Test non-column nodes find
NodePtr non_column_alien = Int32("alien", Repetition::REQUIRED); // other path
NodePtr non_column_familiar = Int32("a", Repetition::REPEATED); // other node
- ASSERT_TRUE(descr_.ColumnIndex(*non_column_alien.get()) < 0);
- ASSERT_TRUE(descr_.ColumnIndex(*non_column_familiar.get()) < 0);
+ ASSERT_TRUE(descr_.ColumnIndex(*non_column_alien) < 0);
+ ASSERT_TRUE(descr_.ColumnIndex(*non_column_familiar) < 0);
ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0));
ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3));
diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc
index cbe72c6..5c3958e 100644
--- a/src/parquet/schema.cc
+++ b/src/parquet/schema.cc
@@ -273,16 +273,14 @@
}
int GroupNode::FieldIndex(const Node& node) const {
- int result = FieldIndex(node.name());
- if (result < 0) {
- return -1;
+ auto search = field_name_to_idx_.equal_range(node.name());
+ for (auto it = search.first; it != search.second; ++it) {
+ const int idx = it->second;
+ if (&node == field(idx).get()) {
+ return idx;
+ }
}
- DCHECK(result < field_count());
- if (!node.Equals(field(result).get())) {
- // Same name but not the same node
- return -1;
- }
- return result;
+ return -1;
}
void GroupNode::Visit(Node::Visitor* visitor) { visitor->Visit(this); }
@@ -721,16 +719,14 @@
}
int SchemaDescriptor::ColumnIndex(const Node& node) const {
- int result = ColumnIndex(node.path()->ToDotString());
- if (result < 0) {
- return -1;
+ auto search = leaf_to_idx_.equal_range(node.path()->ToDotString());
+ for (auto it = search.first; it != search.second; ++it) {
+ const int idx = it->second;
+ if (&node == Column(idx)->schema_node().get()) {
+ return idx;
+ }
}
- DCHECK(result < num_columns());
- if (!node.Equals(Column(result)->schema_node().get())) {
- // Same path but not the same node
- return -1;
- }
- return result;
+ return -1;
}
const schema::Node* SchemaDescriptor::GetColumnRoot(int i) const {
diff --git a/src/parquet/schema.h b/src/parquet/schema.h
index 7b6793b..b778e51 100644
--- a/src/parquet/schema.h
+++ b/src/parquet/schema.h
@@ -264,7 +264,11 @@
bool Equals(const Node* other) const override;
NodePtr field(int i) const { return fields_[i]; }
+ // Get the index of a field by its name, or negative value if not found.
+ // If several fields share the same name, it is unspecified which one
+ // is returned.
int FieldIndex(const std::string& name) const;
+ // Get the index of a field by its node, or negative value if not found.
int FieldIndex(const Node& node) const;
int field_count() const { return static_cast<int>(fields_.size()); }
@@ -282,7 +286,7 @@
auto field_idx = 0;
for (NodePtr& field : fields_) {
field->SetParent(this);
- field_name_to_idx_[field->name()] = field_idx++;
+ field_name_to_idx_.emplace(field->name(), field_idx++);
}
}
@@ -290,11 +294,12 @@
bool EqualsInternal(const GroupNode* other) const;
// Mapping between field name to the field index
- std::unordered_map<std::string, int> field_name_to_idx_;
+ std::unordered_multimap<std::string, int> field_name_to_idx_;
FRIEND_TEST(TestGroupNode, Attrs);
FRIEND_TEST(TestGroupNode, Equals);
FRIEND_TEST(TestGroupNode, FieldIndex);
+ FRIEND_TEST(TestGroupNode, FieldIndexDuplicateName);
};
// ----------------------------------------------------------------------
@@ -393,9 +398,11 @@
const ColumnDescriptor* Column(int i) const;
- // Get the index of a column by its dotstring path, or negative value if not found
+ // Get the index of a column by its dotstring path, or negative value if not found.
+ // If several columns share the same dotstring path, it is unspecified which one
+ // is returned.
int ColumnIndex(const std::string& node_path) const;
- // Get the index of a column by its node, or negative value if not found
+ // Get the index of a column by its node, or negative value if not found.
int ColumnIndex(const schema::Node& node) const;
bool Equals(const SchemaDescriptor& other) const;
@@ -442,7 +449,7 @@
std::unordered_map<int, const schema::NodePtr> leaf_to_base_;
// Mapping between ColumnPath DotString to the leaf index
- std::unordered_map<std::string, int> leaf_to_idx_;
+ std::unordered_multimap<std::string, int> leaf_to_idx_;
};
} // namespace parquet
diff --git a/src/parquet/util/schema-util.h b/src/parquet/util/schema-util.h
index 4e31d3c..1c66f67 100644
--- a/src/parquet/util/schema-util.h
+++ b/src/parquet/util/schema-util.h
@@ -71,7 +71,7 @@
out->clear();
for (auto& column_idx : column_indices) {
auto field_node = descr.GetColumnRoot(column_idx);
- auto field_idx = group->FieldIndex(field_node->name());
+ auto field_idx = group->FieldIndex(*field_node);
if (field_idx < 0) {
return false;
}