blob: c5c78dd1472a61a6169a6e49c1c6e7be58329160 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.avro;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
/**
* Renames and aliases fields in an Avro schema based on the current table schema.
*
* <p>This class creates a read schema based on an Avro file's schema that will correctly translate
* from the file's field names to the current table schema.
*
* <p>This will also rename records in the file's Avro schema to support custom read classes.
*/
class BuildAvroProjection extends AvroCustomOrderSchemaVisitor<Schema, Schema.Field> {
private final Map<String, String> renames;
private Type current;
BuildAvroProjection(org.apache.iceberg.Schema expectedSchema, Map<String, String> renames) {
this.renames = renames;
this.current = expectedSchema.asStruct();
}
BuildAvroProjection(Type expectedType, Map<String, String> renames) {
this.renames = renames;
this.current = expectedType;
}
@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public Schema record(Schema record, List<String> names, Iterable<Schema.Field> schemaIterable) {
Preconditions.checkArgument(
current.isNestedType() && current.asNestedType().isStructType(),
"Cannot project non-struct: %s",
current);
Types.StructType struct = current.asNestedType().asStructType();
boolean hasChange = false;
List<Schema.Field> fields = record.getFields();
List<Schema.Field> fieldResults = Lists.newArrayList(schemaIterable);
Map<String, Schema.Field> updateMap = Maps.newHashMap();
for (int i = 0; i < fields.size(); i += 1) {
Schema.Field field = fields.get(i);
Schema.Field updatedField = fieldResults.get(i);
if (updatedField != null) {
updateMap.put(updatedField.name(), updatedField);
if (!updatedField.schema().equals(field.schema())
|| !updatedField.name().equals(field.name())) {
hasChange = true;
}
} else {
hasChange = true; // column was not projected
}
}
// construct the schema using the expected order
List<Schema.Field> updatedFields = Lists.newArrayListWithExpectedSize(struct.fields().size());
List<Types.NestedField> expectedFields = struct.fields();
for (int i = 0; i < expectedFields.size(); i += 1) {
Types.NestedField field = expectedFields.get(i);
// detect reordering
if (i < fields.size() && !field.name().equals(fields.get(i).name())) {
hasChange = true;
}
String fieldName = AvroSchemaUtil.makeCompatibleName(field.name());
Schema.Field avroField = updateMap.get(fieldName);
if (avroField != null) {
updatedFields.add(avroField);
} else {
Preconditions.checkArgument(
field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()),
"Missing required field: %s",
field.name());
// Create a field that will be defaulted to null. We assign a unique suffix to the field
// to make sure that even if records in the file have the field it is not projected.
Schema.Field newField =
new Schema.Field(
fieldName + "_r" + field.fieldId(),
AvroSchemaUtil.toOption(AvroSchemaUtil.convert(field.type())),
null,
JsonProperties.NULL_VALUE);
newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId());
if (!field.name().equals(fieldName)) {
newField.addProp(AvroSchemaUtil.ICEBERG_FIELD_NAME_PROP, field.name());
}
updatedFields.add(newField);
hasChange = true;
}
}
if (hasChange || renames.containsKey(record.getFullName())) {
return AvroSchemaUtil.copyRecord(record, updatedFields, renames.get(record.getFullName()));
}
return record;
}
@Override
public Schema.Field field(Schema.Field field, Supplier<Schema> fieldResult) {
Types.StructType struct = current.asNestedType().asStructType();
int fieldId = AvroSchemaUtil.getFieldId(field);
Types.NestedField expectedField = struct.field(fieldId);
// if the field isn't present, it was not selected
if (expectedField == null) {
return null;
}
String expectedName = expectedField.name();
this.current = expectedField.type();
try {
Schema schema = fieldResult.get();
if (!Objects.equals(schema, field.schema()) || !expectedName.equals(field.name())) {
// add an alias for the field
return AvroSchemaUtil.copyField(
field, schema, AvroSchemaUtil.makeCompatibleName(expectedName));
} else {
// always copy because fields can't be reused
return AvroSchemaUtil.copyField(field, field.schema(), field.name());
}
} finally {
this.current = struct;
}
}
@Override
public Schema union(Schema union, Iterable<Schema> options) {
Preconditions.checkState(
AvroSchemaUtil.isOptionSchema(union),
"Invalid schema: non-option unions are not supported: %s",
union);
Schema nonNullOriginal = AvroSchemaUtil.fromOption(union);
Schema nonNullResult = AvroSchemaUtil.fromOptions(Lists.newArrayList(options));
if (!Objects.equals(nonNullOriginal, nonNullResult)) {
return AvroSchemaUtil.toOption(nonNullResult);
}
return union;
}
@Override
public Schema array(Schema array, Supplier<Schema> element) {
if (array.getLogicalType() instanceof LogicalMap
|| (current.isMapType() && AvroSchemaUtil.isKeyValueSchema(array.getElementType()))) {
Preconditions.checkArgument(current.isMapType(), "Incompatible projected type: %s", current);
Types.MapType asMapType = current.asNestedType().asMapType();
this.current =
Types.StructType.of(asMapType.fields()); // create a struct to correspond to element
try {
Schema keyValueSchema = array.getElementType();
Schema.Field keyField = keyValueSchema.getFields().get(0);
Schema.Field valueField = keyValueSchema.getFields().get(1);
Schema.Field valueProjection = element.get().getField("value");
// element was changed, create a new array
if (!Objects.equals(valueProjection.schema(), valueField.schema())) {
return AvroSchemaUtil.createProjectionMap(
keyValueSchema.getFullName(),
AvroSchemaUtil.getFieldId(keyField),
keyField.name(),
keyField.schema(),
AvroSchemaUtil.getFieldId(valueField),
valueField.name(),
valueProjection.schema());
} else if (!(array.getLogicalType() instanceof LogicalMap)) {
return AvroSchemaUtil.createProjectionMap(
keyValueSchema.getFullName(),
AvroSchemaUtil.getFieldId(keyField),
keyField.name(),
keyField.schema(),
AvroSchemaUtil.getFieldId(valueField),
valueField.name(),
valueField.schema());
}
return array;
} finally {
this.current = asMapType;
}
} else {
Preconditions.checkArgument(current.isListType(), "Incompatible projected type: %s", current);
Types.ListType list = current.asNestedType().asListType();
this.current = list.elementType();
try {
Schema elementSchema = element.get();
// element was changed, create a new array
if (!Objects.equals(elementSchema, array.getElementType())) {
return AvroSchemaUtil.replaceElement(array, elementSchema);
}
return array;
} finally {
this.current = list;
}
}
}
@Override
public Schema map(Schema map, Supplier<Schema> value) {
Preconditions.checkArgument(
current.isNestedType() && current.asNestedType().isMapType(),
"Incompatible projected type: %s",
current);
Types.MapType asMapType = current.asNestedType().asMapType();
Preconditions.checkArgument(
asMapType.keyType() == Types.StringType.get(),
"Incompatible projected type: key type %s is not string",
asMapType.keyType());
this.current = asMapType.valueType();
try {
Schema valueSchema = value.get();
// element was changed, create a new map
if (!Objects.equals(valueSchema, map.getValueType())) {
return AvroSchemaUtil.replaceValue(map, valueSchema);
}
return map;
} finally {
this.current = asMapType;
}
}
@Override
public Schema primitive(Schema primitive) {
// check for type promotion
switch (primitive.getType()) {
case INT:
if (current.typeId() == Type.TypeID.LONG) {
return Schema.create(Schema.Type.LONG);
}
return primitive;
case FLOAT:
if (current.typeId() == Type.TypeID.DOUBLE) {
return Schema.create(Schema.Type.DOUBLE);
}
return primitive;
default:
return primitive;
}
}
}