Avro: Add Avro-assisted name mapping (#7392)
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java
index e06c774..28cde9b 100644
--- a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java
@@ -93,14 +93,33 @@
private static <P, T> T visitUnion(
P type, Schema union, AvroWithPartnerByStructureVisitor<P, T> visitor) {
List<Schema> types = union.getTypes();
- Preconditions.checkArgument(
- AvroSchemaUtil.isOptionSchema(union), "Cannot visit non-option union: %s", union);
List<T> options = Lists.newArrayListWithExpectedSize(types.size());
- for (Schema branch : types) {
- if (branch.getType() == Schema.Type.NULL) {
- options.add(visit(visitor.nullType(), branch, visitor));
- } else {
- options.add(visit(type, branch, visitor));
+ if (AvroSchemaUtil.isOptionSchema(union)) {
+ for (Schema branch : types) {
+ if (branch.getType() == Schema.Type.NULL) {
+ options.add(visit(visitor.nullType(), branch, visitor));
+ } else {
+ options.add(visit(type, branch, visitor));
+ }
+ }
+ } else {
+ boolean encounteredNull = false;
+ for (int i = 0; i < types.size(); i++) {
+ // For a union-type (a, b, NULL, c) and the corresponding struct type (tag, a, b, c), the
+ // types match according to the following pattern:
+ // Before NULL, branch type i in the union maps to struct field i + 1.
+ // After NULL, branch type i in the union maps to struct field i.
+ int structFieldIndex = (encounteredNull) ? i : i + 1;
+ if (types.get(i).getType() == Schema.Type.NULL) {
+ visit(visitor.nullType(), types.get(i), visitor);
+ encounteredNull = true;
+ } else {
+ options.add(
+ visit(
+ visitor.fieldNameAndType(type, structFieldIndex).second(),
+ types.get(i),
+ visitor));
+ }
}
}
return visitor.union(type, union, options);
diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroWithTypeByStructureVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroWithTypeByStructureVisitor.java
new file mode 100644
index 0000000..f068da1
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithTypeByStructureVisitor.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
+
+public class AvroWithTypeByStructureVisitor<T> extends AvroWithPartnerByStructureVisitor<Type, T> {
+ @Override
+ protected boolean isMapType(Type type) {
+ return type.isMapType();
+ }
+
+ @Override
+ protected boolean isStringType(Type type) {
+ return type.isPrimitiveType() && type.asPrimitiveType().typeId() == Type.TypeID.STRING;
+ }
+
+ @Override
+ protected Type arrayElementType(Type arrayType) {
+ return arrayType.asListType().elementType();
+ }
+
+ @Override
+ protected Type mapKeyType(Type mapType) {
+ return mapType.asMapType().keyType();
+ }
+
+ @Override
+ protected Type mapValueType(Type mapType) {
+ return mapType.asMapType().valueType();
+ }
+
+ @Override
+ protected Pair<String, Type> fieldNameAndType(Type structType, int pos) {
+ Types.NestedField field = structType.asStructType().fields().get(pos);
+ return Pair.of(field.name(), field.type());
+ }
+
+ @Override
+ protected Type nullType() {
+ return null;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/NameMappingWithAvroSchema.java b/core/src/main/java/org/apache/iceberg/avro/NameMappingWithAvroSchema.java
new file mode 100644
index 0000000..96892ee
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/avro/NameMappingWithAvroSchema.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import java.util.List;
+import org.apache.avro.Schema;
+import org.apache.iceberg.mapping.MappedField;
+import org.apache.iceberg.mapping.MappedFields;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+public class NameMappingWithAvroSchema extends AvroWithTypeByStructureVisitor<MappedFields> {
+ @Override
+ public MappedFields record(
+ Type struct, Schema record, List<String> names, List<MappedFields> fieldResults) {
+ List<MappedField> fields = Lists.newArrayListWithExpectedSize(fieldResults.size());
+
+ for (int i = 0; i < fieldResults.size(); i += 1) {
+ Types.NestedField field = struct.asStructType().fields().get(i);
+ MappedFields result = fieldResults.get(i);
+ fields.add(MappedField.of(field.fieldId(), field.name(), result));
+ }
+
+ return MappedFields.of(fields);
+ }
+
+ @Override
+ public MappedFields union(Type type, Schema union, List<MappedFields> optionResults) {
+ if (AvroSchemaUtil.isOptionSchema(union)) {
+ for (int i = 0; i < optionResults.size(); i += 1) {
+ if (union.getTypes().get(i).getType() != Schema.Type.NULL) {
+ return optionResults.get(i);
+ }
+ }
+ } else { // Complex union
+ Preconditions.checkArgument(
+ type instanceof Types.StructType,
+ "Cannot visit invalid Iceberg type: %s for Avro complex union type: %s",
+ type,
+ union);
+ Types.StructType struct = (Types.StructType) type;
+ List<MappedField> fields = Lists.newArrayListWithExpectedSize(optionResults.size());
+ int index = 0;
+ // Avro spec for union types states that unions may not contain more than one schema with the
+ // same type, except for the named types record, fixed and enum. For example, unions
+ // containing two array types or two map types are not permitted, but two named types with
+ // different names are permitted.
+ // Therefore, for non-named types, use the Avro type toString() as the field mapping key. For
+ // named types, use the record name of the Avro type as the field mapping key.
+ for (Schema option : union.getTypes()) {
+ if (option.getType() != Schema.Type.NULL) {
+ // Check if current option is a named type, i.e., a RECORD, ENUM, or FIXED type. If so,
+ // use the record name of the Avro type as the field name. Otherwise, use the Avro
+ // type toString().
+ if (option.getType() == Schema.Type.RECORD
+ || option.getType() == Schema.Type.ENUM
+ || option.getType() == Schema.Type.FIXED) {
+ fields.add(
+ MappedField.of(
+ struct.fields().get(index).fieldId(),
+ option.getName(),
+ optionResults.get(index)));
+ } else {
+ fields.add(
+ MappedField.of(
+ struct.fields().get(index).fieldId(),
+ option.getType().getName(),
+ optionResults.get(index)));
+ }
+
+ // Both iStruct and optionResults do not contain an entry for the NULL type, so we need to
+ // increment i only when we encounter a non-NULL type.
+ index++;
+ }
+ }
+ return MappedFields.of(fields);
+ }
+ return null;
+ }
+
+ @Override
+ public MappedFields array(Type list, Schema array, MappedFields elementResult) {
+ return MappedFields.of(MappedField.of(list.asListType().elementId(), "element", elementResult));
+ }
+
+ @Override
+ public MappedFields map(Type sMap, Schema map, MappedFields keyResult, MappedFields valueResult) {
+ return MappedFields.of(
+ MappedField.of(sMap.asMapType().keyId(), "key", keyResult),
+ MappedField.of(sMap.asMapType().valueId(), "value", valueResult));
+ }
+
+ @Override
+ public MappedFields map(Type sMap, Schema map, MappedFields valueResult) {
+ return MappedFields.of(
+ MappedField.of(sMap.asMapType().keyId(), "key", null),
+ MappedField.of(sMap.asMapType().valueId(), "value", valueResult));
+ }
+
+ @Override
+ public MappedFields primitive(Type type, Schema primitive) {
+ return null; // no mapping because primitives have no nested fields
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
index 174d639..352fe22 100644
--- a/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
+++ b/core/src/main/java/org/apache/iceberg/avro/SchemaToType.java
@@ -104,13 +104,24 @@
@Override
public Type union(Schema union, List<Type> options) {
- Preconditions.checkArgument(
- AvroSchemaUtil.isOptionSchema(union), "Unsupported type: non-option union: %s", union);
- // records, arrays, and maps will check nullability later
- if (options.get(0) == null) {
- return options.get(1);
+ if (AvroSchemaUtil.isOptionSchema(union)) {
+ if (options.get(0) == null) {
+ return options.get(1);
+ } else {
+ return options.get(0);
+ }
} else {
- return options.get(0);
+ // Create list of Iceberg schema fields
+ List<Types.NestedField> fields = Lists.newArrayListWithExpectedSize(options.size());
+ int tagIndex = 0;
+ fields.add(Types.NestedField.required(allocateId(), "tag", Types.IntegerType.get()));
+ for (Type option : options) {
+ if (option != null) {
+ fields.add(Types.NestedField.optional(allocateId(), "field" + tagIndex, option));
+ tagIndex++;
+ }
+ }
+ return Types.StructType.of(fields);
}
}
diff --git a/core/src/test/java/org/apache/iceberg/avro/TestNameMappingWithAvroSchema.java b/core/src/test/java/org/apache/iceberg/avro/TestNameMappingWithAvroSchema.java
new file mode 100644
index 0000000..9c8ea84
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/avro/TestNameMappingWithAvroSchema.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.avro;
+
+import org.apache.avro.Schema;
+import org.apache.iceberg.mapping.MappedField;
+import org.apache.iceberg.mapping.MappedFields;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestNameMappingWithAvroSchema {
+ @Test
+ public void testNameMappingWithAvroSchema() {
+
+ // Create an example Avro schema with a nested record but not using the SchemaBuilder
+ Schema schema =
+ Schema.createRecord(
+ "test",
+ null,
+ null,
+ false,
+ Lists.newArrayList(
+ new Schema.Field("id", Schema.create(Schema.Type.INT)),
+ new Schema.Field("data", Schema.create(Schema.Type.STRING)),
+ new Schema.Field(
+ "location",
+ Schema.createRecord(
+ "location",
+ null,
+ null,
+ false,
+ Lists.newArrayList(
+ new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE)),
+ new Schema.Field("long", Schema.create(Schema.Type.DOUBLE))))),
+ new Schema.Field("friends", Schema.createArray(Schema.create(Schema.Type.STRING))),
+ new Schema.Field(
+ "simpleUnion",
+ Schema.createUnion(
+ Lists.newArrayList(
+ Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)))),
+ new Schema.Field(
+ "complexUnion",
+ Schema.createUnion(
+ new Schema[] {
+ Schema.create(Schema.Type.NULL),
+ Schema.create(Schema.Type.STRING),
+ Schema.createRecord(
+ "innerRecord1",
+ null,
+ "namespace1",
+ false,
+ Lists.newArrayList(
+ new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE)),
+ new Schema.Field("long", Schema.create(Schema.Type.DOUBLE)))),
+ Schema.createRecord(
+ "innerRecord2",
+ null,
+ "namespace2",
+ false,
+ Lists.newArrayList(
+ new Schema.Field("lat", Schema.create(Schema.Type.DOUBLE)),
+ new Schema.Field("long", Schema.create(Schema.Type.DOUBLE)))),
+ Schema.createRecord(
+ "innerRecord3",
+ null,
+ "namespace3",
+ false,
+ Lists.newArrayList(
+ new Schema.Field(
+ "innerUnion",
+ Schema.createUnion(
+ Lists.newArrayList(
+ Schema.create(Schema.Type.STRING),
+ Schema.create(Schema.Type.INT)))))),
+ Schema.createEnum(
+ "timezone", null, null, Lists.newArrayList("UTC", "PST", "EST")),
+ Schema.createFixed("bitmap", null, null, 1)
+ }))));
+
+ NameMappingWithAvroSchema nameMappingWithAvroSchema = new NameMappingWithAvroSchema();
+
+ // Convert Avro schema to Iceberg schema
+ org.apache.iceberg.Schema icebergSchema = AvroSchemaUtil.toIceberg(schema);
+ MappedFields expected =
+ MappedFields.of(
+ MappedField.of(0, "id"),
+ MappedField.of(1, "data"),
+ MappedField.of(
+ 2,
+ "location",
+ MappedFields.of(MappedField.of(6, "lat"), MappedField.of(7, "long"))),
+ MappedField.of(3, "friends", MappedFields.of(MappedField.of(8, "element"))),
+ MappedField.of(4, "simpleUnion"),
+ MappedField.of(
+ 5,
+ "complexUnion",
+ MappedFields.of(
+ MappedField.of(17, "string"),
+ MappedField.of(
+ 18,
+ "innerRecord1",
+ MappedFields.of(MappedField.of(9, "lat"), MappedField.of(10, "long"))),
+ MappedField.of(
+ 19,
+ "innerRecord2",
+ MappedFields.of(MappedField.of(11, "lat"), MappedField.of(12, "long"))),
+ MappedField.of(
+ 20,
+ "innerRecord3",
+ MappedFields.of(
+ MappedField.of(
+ 16,
+ "innerUnion",
+ MappedFields.of(
+ MappedField.of(13, "string"), MappedField.of(14, "int"))))),
+ MappedField.of(21, "timezone"),
+ MappedField.of(22, "bitmap"))));
+ Assert.assertEquals(
+ expected,
+ AvroWithPartnerByStructureVisitor.visit(
+ icebergSchema.asStruct(), schema, nameMappingWithAvroSchema));
+ }
+}