PARQUET-1245: Fix creating Arrow table with duplicate column names

Author: Antoine Pitrou <antoine@python.org>

Closes #447 from pitrou/ARROW-1974-duplicate-column-name and squashes the following commits:

a764a3c [Antoine Pitrou] Pass ColumnIndex() / FieldIndex() node argument by reference
4398d58 [Antoine Pitrou] ARROW-1974: Fix creating Arrow schema with duplicate column names
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 72e65d4..f06f4a8 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1669,6 +1669,27 @@
   }
 }
 
+TEST(TestArrowReadWrite, TableWithDuplicateColumns) {
+  // See ARROW-1974
+  using ::arrow::ArrayFromVector;
+
+  auto f0 = field("duplicate", ::arrow::int8());
+  auto f1 = field("duplicate", ::arrow::int16());
+  auto schema = ::arrow::schema({f0, f1});
+
+  std::vector<int8_t> a0_values = {1, 2, 3};
+  std::vector<int16_t> a1_values = {14, 15, 16};
+
+  std::shared_ptr<Array> a0, a1;
+
+  ArrayFromVector<::arrow::Int8Type, int8_t>(a0_values, &a0);
+  ArrayFromVector<::arrow::Int16Type, int16_t>(a1_values, &a1);
+
+  auto table = Table::Make(schema, {std::make_shared<Column>(f0->name(), a0),
+                                    std::make_shared<Column>(f1->name(), a1)});
+  CheckSimpleRoundtrip(table, table->num_rows());
+}
+
 TEST(TestArrowWrite, CheckChunkSize) {
   const int num_columns = 2;
   const int num_rows = 128;
diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc
index d502d24..da6af52 100644
--- a/src/parquet/arrow/arrow-schema-test.cc
+++ b/src/parquet/arrow/arrow-schema-test.cc
@@ -165,6 +165,31 @@
   CheckFlatSchema(arrow_schema);
 }
 
+TEST_F(TestConvertParquetSchema, DuplicateFieldNames) {
+  std::vector<NodePtr> parquet_fields;
+  std::vector<std::shared_ptr<Field>> arrow_fields;
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("xxx", Repetition::REQUIRED, ParquetType::BOOLEAN));
+  auto arrow_field1 = std::make_shared<Field>("xxx", BOOL, false);
+
+  parquet_fields.push_back(
+      PrimitiveNode::Make("xxx", Repetition::REQUIRED, ParquetType::INT32));
+  auto arrow_field2 = std::make_shared<Field>("xxx", INT32, false);
+
+  ASSERT_OK(ConvertSchema(parquet_fields));
+  arrow_fields = {arrow_field1, arrow_field2};
+  CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields));
+
+  ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>({0, 1})));
+  arrow_fields = {arrow_field1, arrow_field2};
+  CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields));
+
+  ASSERT_OK(ConvertSchema(parquet_fields, std::vector<int>({1, 0})));
+  arrow_fields = {arrow_field2, arrow_field1};
+  CheckFlatSchema(std::make_shared<::arrow::Schema>(arrow_fields));
+}
+
 TEST_F(TestConvertParquetSchema, ParquetKeyValueMetadata) {
   std::vector<NodePtr> parquet_fields;
   std::vector<std::shared_ptr<Field>> arrow_fields;
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index bd68ec3..78c3225 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -443,7 +443,7 @@
 }
 
 Status FileReader::Impl::ReadTable(const std::vector<int>& indices,
-                                   std::shared_ptr<Table>* table) {
+                                   std::shared_ptr<Table>* out) {
   std::shared_ptr<::arrow::Schema> schema;
   RETURN_NOT_OK(GetSchema(indices, &schema));
 
@@ -473,7 +473,9 @@
     RETURN_NOT_OK(ParallelFor(nthreads, num_fields, ReadColumnFunc));
   }
 
-  *table = Table::Make(schema, columns);
+  std::shared_ptr<Table> table = Table::Make(schema, columns);
+  RETURN_NOT_OK(table->Validate());
+  *out = table;
   return Status::OK();
 }
 
diff --git a/src/parquet/schema-test.cc b/src/parquet/schema-test.cc
index c8cce9f..ec9aff4 100644
--- a/src/parquet/schema-test.cc
+++ b/src/parquet/schema-test.cc
@@ -292,6 +292,17 @@
 
     return fields;
   }
+
+  NodeVector Fields2() {
+    // Fields with a duplicate name
+    NodeVector fields;
+
+    fields.push_back(Int32("duplicate", Repetition::REQUIRED));
+    fields.push_back(Int64("unique"));
+    fields.push_back(Double("duplicate"));
+
+    return fields;
+  }
 };
 
 TEST_F(TestGroupNode, Attrs) {
@@ -346,14 +357,23 @@
   GroupNode group("group", Repetition::REQUIRED, fields);
   for (size_t i = 0; i < fields.size(); i++) {
     auto field = group.field(static_cast<int>(i));
-    ASSERT_EQ(i, group.FieldIndex(*field.get()));
+    ASSERT_EQ(i, group.FieldIndex(*field));
   }
 
   // Test a non field node
   auto non_field_alien = Int32("alien", Repetition::REQUIRED);   // other name
   auto non_field_familiar = Int32("one", Repetition::REPEATED);  // other node
-  ASSERT_TRUE(group.FieldIndex(*non_field_alien.get()) < 0);
-  ASSERT_TRUE(group.FieldIndex(*non_field_familiar.get()) < 0);
+  ASSERT_TRUE(group.FieldIndex(*non_field_alien) < 0);
+  ASSERT_TRUE(group.FieldIndex(*non_field_familiar) < 0);
+}
+
+TEST_F(TestGroupNode, FieldIndexDuplicateName) {
+  NodeVector fields = Fields2();
+  GroupNode group("group", Repetition::REQUIRED, fields);
+  for (size_t i = 0; i < fields.size(); i++) {
+    auto field = group.field(static_cast<int>(i));
+    ASSERT_EQ(i, group.FieldIndex(*field));
+  }
 }
 
 // ----------------------------------------------------------------------
@@ -677,14 +697,14 @@
 
   for (int i = 0; i < nleaves; ++i) {
     auto col = descr_.Column(i);
-    ASSERT_EQ(i, descr_.ColumnIndex(*col->schema_node().get()));
+    ASSERT_EQ(i, descr_.ColumnIndex(*col->schema_node()));
   }
 
   // Test non-column nodes find
   NodePtr non_column_alien = Int32("alien", Repetition::REQUIRED);  // other path
   NodePtr non_column_familiar = Int32("a", Repetition::REPEATED);   // other node
-  ASSERT_TRUE(descr_.ColumnIndex(*non_column_alien.get()) < 0);
-  ASSERT_TRUE(descr_.ColumnIndex(*non_column_familiar.get()) < 0);
+  ASSERT_TRUE(descr_.ColumnIndex(*non_column_alien) < 0);
+  ASSERT_TRUE(descr_.ColumnIndex(*non_column_familiar) < 0);
 
   ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0));
   ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3));
diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc
index cbe72c6..5c3958e 100644
--- a/src/parquet/schema.cc
+++ b/src/parquet/schema.cc
@@ -273,16 +273,14 @@
 }
 
 int GroupNode::FieldIndex(const Node& node) const {
-  int result = FieldIndex(node.name());
-  if (result < 0) {
-    return -1;
+  auto search = field_name_to_idx_.equal_range(node.name());
+  for (auto it = search.first; it != search.second; ++it) {
+    const int idx = it->second;
+    if (&node == field(idx).get()) {
+      return idx;
+    }
   }
-  DCHECK(result < field_count());
-  if (!node.Equals(field(result).get())) {
-    // Same name but not the same node
-    return -1;
-  }
-  return result;
+  return -1;
 }
 
 void GroupNode::Visit(Node::Visitor* visitor) { visitor->Visit(this); }
@@ -721,16 +719,14 @@
 }
 
 int SchemaDescriptor::ColumnIndex(const Node& node) const {
-  int result = ColumnIndex(node.path()->ToDotString());
-  if (result < 0) {
-    return -1;
+  auto search = leaf_to_idx_.equal_range(node.path()->ToDotString());
+  for (auto it = search.first; it != search.second; ++it) {
+    const int idx = it->second;
+    if (&node == Column(idx)->schema_node().get()) {
+      return idx;
+    }
   }
-  DCHECK(result < num_columns());
-  if (!node.Equals(Column(result)->schema_node().get())) {
-    // Same path but not the same node
-    return -1;
-  }
-  return result;
+  return -1;
 }
 
 const schema::Node* SchemaDescriptor::GetColumnRoot(int i) const {
diff --git a/src/parquet/schema.h b/src/parquet/schema.h
index 7b6793b..b778e51 100644
--- a/src/parquet/schema.h
+++ b/src/parquet/schema.h
@@ -264,7 +264,11 @@
   bool Equals(const Node* other) const override;
 
   NodePtr field(int i) const { return fields_[i]; }
+  // Get the index of a field by its name, or negative value if not found.
+  // If several fields share the same name, it is unspecified which one
+  // is returned.
   int FieldIndex(const std::string& name) const;
+  // Get the index of a field by its node, or negative value if not found.
   int FieldIndex(const Node& node) const;
 
   int field_count() const { return static_cast<int>(fields_.size()); }
@@ -282,7 +286,7 @@
     auto field_idx = 0;
     for (NodePtr& field : fields_) {
       field->SetParent(this);
-      field_name_to_idx_[field->name()] = field_idx++;
+      field_name_to_idx_.emplace(field->name(), field_idx++);
     }
   }
 
@@ -290,11 +294,12 @@
   bool EqualsInternal(const GroupNode* other) const;
 
   // Mapping between field name to the field index
-  std::unordered_map<std::string, int> field_name_to_idx_;
+  std::unordered_multimap<std::string, int> field_name_to_idx_;
 
   FRIEND_TEST(TestGroupNode, Attrs);
   FRIEND_TEST(TestGroupNode, Equals);
   FRIEND_TEST(TestGroupNode, FieldIndex);
+  FRIEND_TEST(TestGroupNode, FieldIndexDuplicateName);
 };
 
 // ----------------------------------------------------------------------
@@ -393,9 +398,11 @@
 
   const ColumnDescriptor* Column(int i) const;
 
-  // Get the index of a column by its dotstring path, or negative value if not found
+  // Get the index of a column by its dotstring path, or negative value if not found.
+  // If several columns share the same dotstring path, it is unspecified which one
+  // is returned.
   int ColumnIndex(const std::string& node_path) const;
-  // Get the index of a column by its node, or negative value if not found
+  // Get the index of a column by its node, or negative value if not found.
   int ColumnIndex(const schema::Node& node) const;
 
   bool Equals(const SchemaDescriptor& other) const;
@@ -442,7 +449,7 @@
   std::unordered_map<int, const schema::NodePtr> leaf_to_base_;
 
   // Mapping between ColumnPath DotString to the leaf index
-  std::unordered_map<std::string, int> leaf_to_idx_;
+  std::unordered_multimap<std::string, int> leaf_to_idx_;
 };
 
 }  // namespace parquet
diff --git a/src/parquet/util/schema-util.h b/src/parquet/util/schema-util.h
index 4e31d3c..1c66f67 100644
--- a/src/parquet/util/schema-util.h
+++ b/src/parquet/util/schema-util.h
@@ -71,7 +71,7 @@
   out->clear();
   for (auto& column_idx : column_indices) {
     auto field_node = descr.GetColumnRoot(column_idx);
-    auto field_idx = group->FieldIndex(field_node->name());
+    auto field_idx = group->FieldIndex(*field_node);
     if (field_idx < 0) {
       return false;
     }