Avro: Add Avro-assisted name mapping (#7392)

diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java
index e06c774..28cde9b 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java
@@ -93,14 +93,33 @@
   private static <P, T> T visitUnion(
       P type, Schema union, AvroWithPartnerByStructureVisitor<P, T> visitor) {
     List<Schema> types = union.getTypes();
-    Preconditions.checkArgument(
-        AvroSchemaUtil.isOptionSchema(union), "Cannot visit non-option union: %s", union);
     List<T> options = Lists.newArrayListWithExpectedSize(types.size());
-    for (Schema branch : types) {
-      if (branch.getType() == Schema.Type.NULL) {
-        options.add(visit(visitor.nullType(), branch, visitor));
-      } else {
-        options.add(visit(type, branch, visitor));
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (Schema branch : types) {
+        if (branch.getType() == Schema.Type.NULL) {
+          options.add(visit(visitor.nullType(), branch, visitor));
+        } else {
+          options.add(visit(type, branch, visitor));
+        }
+      }
+    } else {
+      boolean encounteredNull = false;
+      for (int i = 0; i < types.size(); i++) {
+        // For a union-type (a, b, NULL, c) and the corresponding struct type (tag, a, b, c), the
+        // types match according to the following pattern:
+        // Before NULL, branch type i in the union maps to struct field i + 1.
+        // After NULL, branch type i in the union maps to struct field i.
+        int structFieldIndex = (encounteredNull) ? i : i + 1;
+        if (types.get(i).getType() == Schema.Type.NULL) {
+          visit(visitor.nullType(), types.get(i), visitor);
+          encounteredNull = true;
+        } else {
+          options.add(
+              visit(
+                  visitor.fieldNameAndType(type, structFieldIndex).second(),
+                  types.get(i),
+                  visitor));
+        }
       }
     }
     return visitor.union(type, union, options);
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroWithTypeByStructureVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroWithTypeByStructureVisitor.java
new file mode 100644
index 0000000..f068da1
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithTypeByStructureVisitor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+
+public class AvroWithTypeByStructureVisitor<T> extends AvroWithPartnerByStructureVisitor<Type, T> {
+  @Override
+  protected boolean isMapType(Type type) {
+    return type.isMapType();
+  }
+
+  @Override
+  protected boolean isStringType(Type type) {
+    return type.isPrimitiveType() && type.asPrimitiveType().typeId() == Type.TypeID.STRING;
+  }
+
+  @Override
+  protected Type arrayElementType(Type arrayType) {
+    return arrayType.asListType().elementType();
+  }
+
+  @Override
+  protected Type mapKeyType(Type mapType) {
+    return mapType.asMapType().keyType();
+  }
+
+  @Override
+  protected Type mapValueType(Type mapType) {
+    return mapType.asMapType().valueType();
+  }
+
+  @Override
+  protected Pair<String, Type> fieldNameAndType(Type structType, int pos) {
+    Types.NestedField field = structType.asStructType().fields().get(pos);
+    return Pair.of(field.name(), field.type());
+  }
+
+  @Override
+  protected Type nullType() {
+    return null;
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/NameMappingWithAvroSchema.java b/core/src/main/java/org/apache/iceberg/avro/NameMappingWithAvroSchema.java
new file mode 100644
index 0000000..96892ee
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/NameMappingWithAvroSchema.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.iceberg.mapping.MappedField;
+import org.apache.iceberg.mapping.MappedFields;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+public class NameMappingWithAvroSchema extends AvroWithTypeByStructureVisitor<MappedFields> {
+  @Override
+  public MappedFields record(
+      Type struct, Schema record, List<String> names, List<MappedFields> fieldResults) {
+    List<MappedField> fields = Lists.newArrayListWithExpectedSize(fieldResults.size());
+
+    for (int i = 0; i < fieldResults.size(); i += 1) {
+      Types.NestedField field = struct.asStructType().fields().get(i);
+      MappedFields result = fieldResults.get(i);
+      fields.add(MappedField.of(field.fieldId(), field.name(), result));
+    }
+
+    return MappedFields.of(fields);
+  }
+
+  @Override
+  public MappedFields union(Type type, Schema union, List<MappedFields> optionResults) {
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      for (int i = 0; i < optionResults.size(); i += 1) {
+        if (union.getTypes().get(i).getType() != Schema.Type.NULL) {
+          return optionResults.get(i);
+        }
+      }
+    } else { // Complex union
+      Preconditions.checkArgument(
+          type instanceof Types.StructType,
+          "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+          type,
+          union);
+      Types.StructType struct = (Types.StructType) type;
+      List<MappedField> fields = Lists.newArrayListWithExpectedSize(optionResults.size());
+      int index = 0;
+      // Avro spec for union types states that unions may not contain more than one schema with the
+      // same type, except for the named types record, fixed and enum. For example, unions
+      // containing two array types or two map types are not permitted, but two named types with
+      // different names are permitted.
+      // Therefore, for non-named types, use the Avro type toString() as the field mapping key. For
+      // named types, use the record name of the Avro type as the field mapping key.
+      for (Schema option : union.getTypes()) {
+        if (option.getType() != Schema.Type.NULL) {
+          // Check if current option is a named type, i.e., a RECORD, ENUM, or FIXED type. If so,
+          // use the record name of the Avro type as the field name. Otherwise, use the Avro
+          // type toString().
+          if (option.getType() == Schema.Type.RECORD
+              || option.getType() == Schema.Type.ENUM
+              || option.getType() == Schema.Type.FIXED) {
+            fields.add(
+                MappedField.of(
+                    struct.fields().get(index).fieldId(),
+                    option.getName(),
+                    optionResults.get(index)));
+          } else {
+            fields.add(
+                MappedField.of(
+                    struct.fields().get(index).fieldId(),
+                    option.getType().getName(),
+                    optionResults.get(index)));
+          }
+
+          // Both iStruct and optionResults do not contain an entry for the NULL type, so we need to
+          // increment i only when we encounter a non-NULL type.
+          index++;
+        }
+      }
+      return MappedFields.of(fields);
+    }
+    return null;
+  }
+
+  @Override
+  public MappedFields array(Type list, Schema array, MappedFields elementResult) {
+    return MappedFields.of(MappedField.of(list.asListType().elementId(), "element", elementResult));
+  }
+
+  @Override
+  public MappedFields map(Type sMap, Schema map, MappedFields keyResult, MappedFields valueResult) {
+    return MappedFields.of(
+        MappedField.of(sMap.asMapType().keyId(), "key", keyResult),
+        MappedField.of(sMap.asMapType().valueId(), "value", valueResult));
+  }
+
+  @Override
+  public MappedFields map(Type sMap, Schema map, MappedFields valueResult) {
+    return MappedFields.of(
+        MappedField.of(sMap.asMapType().keyId(), "key", null),
+        MappedField.of(sMap.asMapType().valueId(), "value", valueResult));
+  }
+
+  @Override
+  public MappedFields primitive(Type type, Schema primitive) {
+    return null; // no mapping because primitives have no nested fields
+  }
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
index 174d639..352fe22 100644
--- a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
+++ b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
@@ -104,13 +104,24 @@
 
   @Override
   public Type union(Schema union, List<Type> options) {
-    Preconditions.checkArgument(
-        AvroSchemaUtil.isOptionSchema(union), "Unsupported type: non-option union: %s", union);
-    // records, arrays, and maps will check nullability later
-    if (options.get(0) == null) {
-      return options.get(1);
+    if (AvroSchemaUtil.isOptionSchema(union)) {
+      if (options.get(0) == null) {
+        return options.get(1);
+      } else {
+        return options.get(0);
+      }
     } else {
-      return options.get(0);
+      // Create list of Iceberg schema fields
+      List<Types.NestedField> fields = Lists.newArrayListWithExpectedSize(options.size());
+      int tagIndex = 0;
+      fields.add(Types.NestedField.required(allocateId(), "tag", Types.IntegerType.get()));
+      for (Type option : options) {
+        if (option != null) {
+          fields.add(Types.NestedField.optional(allocateId(), "field" + tagIndex, option));
+          tagIndex++;
+        }
+      }
+      return Types.StructType.of(fields);
     }
   }
 
diff --git a/core/src/test/java/org/apache/iceberg/avro/TestNameMappingWithAvroSchema.java b/core/src/test/java/org/apache/iceberg/avro/TestNameMappingWithAvroSchema.java
new file mode 100644
index 0000000..9c8ea84
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/avro/TestNameMappingWithAvroSchema.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import org.apache.avro.Schema;
+import org.apache.iceberg.mapping.MappedField;
+import org.apache.iceberg.mapping.MappedFields;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestNameMappingWithAvroSchema {
+  @Test
+  public void testNameMappingWithAvroSchema() {
+
+    // Create an example Avro schema with a nested record but not using the SchemaBuilder
+    Schema schema =
+        Schema.createRecord(
+            "test",
+            null,
+            null,
+            false,
+            Lists.newArrayList(
+                new Schema.Field("id", Schema.create(Schema.Type.INT)),
+                new Schema.Field("data", Schema.create(Schema.Type.STRING)),
+                new Schema.Field(
+                    "location",
+                    Schema.createRecord(
+                        "location",
+                        null,
+                        null,
+                        false,
+                        Lists.newArrayList(
+                            new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE)),
+                            new Schema.Field("long", Schema.create(Schema.Type.DOUBLE))))),
+                new Schema.Field("friends", Schema.createArray(Schema.create(Schema.Type.STRING))),
+                new Schema.Field(
+                    "simpleUnion",
+                    Schema.createUnion(
+                        Lists.newArrayList(
+                            Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)))),
+                new Schema.Field(
+                    "complexUnion",
+                    Schema.createUnion(
+                        new Schema[] {
+                          Schema.create(Schema.Type.NULL),
+                          Schema.create(Schema.Type.STRING),
+                          Schema.createRecord(
+                              "innerRecord1",
+                              null,
+                              "namespace1",
+                              false,
+                              Lists.newArrayList(
+                                  new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE)),
+                                  new Schema.Field("long", Schema.create(Schema.Type.DOUBLE)))),
+                          Schema.createRecord(
+                              "innerRecord2",
+                              null,
+                              "namespace2",
+                              false,
+                              Lists.newArrayList(
+                                  new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE)),
+                                  new Schema.Field("long", Schema.create(Schema.Type.DOUBLE)))),
+                          Schema.createRecord(
+                              "innerRecord3",
+                              null,
+                              "namespace3",
+                              false,
+                              Lists.newArrayList(
+                                  new Schema.Field(
+                                      "innerUnion",
+                                      Schema.createUnion(
+                                          Lists.newArrayList(
+                                              Schema.create(Schema.Type.STRING),
+                                              Schema.create(Schema.Type.INT)))))),
+                          Schema.createEnum(
+                              "timezone", null, null, Lists.newArrayList("UTC", "PST", "EST")),
+                          Schema.createFixed("bitmap", null, null, 1)
+                        }))));
+
+    NameMappingWithAvroSchema nameMappingWithAvroSchema = new NameMappingWithAvroSchema();
+
+    // Convert Avro schema to Iceberg schema
+    org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(schema);
+    MappedFields expected =
+        MappedFields.of(
+            MappedField.of(0, "id"),
+            MappedField.of(1, "data"),
+            MappedField.of(
+                2,
+                "location",
+                MappedFields.of(MappedField.of(6, "lat"), MappedField.of(7, "long"))),
+            MappedField.of(3, "friends", MappedFields.of(MappedField.of(8, "element"))),
+            MappedField.of(4, "simpleUnion"),
+            MappedField.of(
+                5,
+                "complexUnion",
+                MappedFields.of(
+                    MappedField.of(17, "string"),
+                    MappedField.of(
+                        18,
+                        "innerRecord1",
+                        MappedFields.of(MappedField.of(9, "lat"), MappedField.of(10, "long"))),
+                    MappedField.of(
+                        19,
+                        "innerRecord2",
+                        MappedFields.of(MappedField.of(11, "lat"), MappedField.of(12, "long"))),
+                    MappedField.of(
+                        20,
+                        "innerRecord3",
+                        MappedFields.of(
+                            MappedField.of(
+                                16,
+                                "innerUnion",
+                                MappedFields.of(
+                                    MappedField.of(13, "string"), MappedField.of(14, "int"))))),
+                    MappedField.of(21, "timezone"),
+                    MappedField.of(22, "bitmap"))));
+    Assert.assertEquals(
+        expected,
+        AvroWithPartnerByStructureVisitor.visit(
+            icebergSchema.asStruct(), schema, nameMappingWithAvroSchema));
+  }
+}