blob: f5378130adbe9a50551424c33df81cac87103ad9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.schema;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.iceberg.Schema;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
/**
* Visitor class that accumulates the set of changes needed to evolve an existing schema into the union of the
* existing and a new schema. Changes are added to an {@link UpdateSchema} operation.
*/
public class UnionByNameVisitor extends SchemaWithPartnerVisitor<Integer, Boolean> {
private final UpdateSchema api;
private final Schema partnerSchema;
private UnionByNameVisitor(UpdateSchema api, Schema partnerSchema) {
this.api = api;
this.partnerSchema = partnerSchema;
}
/**
* Adds changes needed to produce a union of two schemas to an {@link UpdateSchema} operation.
* <p>
* Changes are accumulated to evolve the existingSchema into a union with newSchema.
*
* @param api an UpdateSchema for adding changes
* @param existingSchema an existing schema
* @param newSchema a new schema to compare with the existing
*/
public static void visit(UpdateSchema api, Schema existingSchema, Schema newSchema) {
visit(newSchema, -1, new UnionByNameVisitor(api, existingSchema), new PartnerIdByNameAccessors(existingSchema));
}
@Override
public Boolean struct(Types.StructType struct, Integer partnerId, List<Boolean> missingPositions) {
if (partnerId == null) {
return true;
}
List<Types.NestedField> fields = struct.fields();
Types.StructType partnerStruct = findFieldType(partnerId).asStructType();
IntStream.range(0, missingPositions.size())
.forEach(pos -> {
Boolean isMissing = missingPositions.get(pos);
Types.NestedField field = fields.get(pos);
if (isMissing) {
addColumn(partnerId, field);
} else {
updateColumn(field, partnerStruct.field(field.name()));
}
});
return false;
}
@Override
public Boolean field(Types.NestedField field, Integer partnerId, Boolean isFieldMissing) {
return partnerId == null;
}
@Override
public Boolean list(Types.ListType list, Integer partnerId, Boolean isElementMissing) {
if (partnerId == null) {
return true;
}
Preconditions.checkState(!isElementMissing, "Error traversing schemas: element is missing, but list is present");
Types.ListType partnerList = findFieldType(partnerId).asListType();
updateColumn(list.fields().get(0), partnerList.fields().get(0));
return false;
}
@Override
public Boolean map(Types.MapType map, Integer partnerId, Boolean isKeyMissing, Boolean isValueMissing) {
if (partnerId == null) {
return true;
}
Preconditions.checkState(!isKeyMissing, "Error traversing schemas: key is missing, but map is present");
Preconditions.checkState(!isValueMissing, "Error traversing schemas: value is missing, but map is present");
Types.MapType partnerMap = findFieldType(partnerId).asMapType();
updateColumn(map.fields().get(0), partnerMap.fields().get(0));
updateColumn(map.fields().get(1), partnerMap.fields().get(1));
return false;
}
@Override
public Boolean primitive(Type.PrimitiveType primitive, Integer partnerId) {
return partnerId == null;
}
private Type findFieldType(int fieldId) {
if (fieldId == -1) {
return partnerSchema.asStruct();
} else {
return partnerSchema.findField(fieldId).type();
}
}
private void addColumn(int parentId, Types.NestedField field) {
String parentName = partnerSchema.findColumnName(parentId);
api.addColumn(parentName, field.name(), field.type(), field.doc());
}
private void updateColumn(Types.NestedField field, Types.NestedField existingField) {
String fullName = partnerSchema.findColumnName(existingField.fieldId());
boolean needsOptionalUpdate = field.isOptional() && existingField.isRequired();
boolean needsTypeUpdate = field.type().isPrimitiveType() && !field.type().equals(existingField.type());
boolean needsDocUpdate = field.doc() != null && !field.doc().equals(existingField.doc());
if (needsOptionalUpdate) {
api.makeColumnOptional(fullName);
}
if (needsTypeUpdate) {
api.updateColumn(fullName, field.type().asPrimitiveType());
}
if (needsDocUpdate) {
api.updateColumnDoc(fullName, field.doc());
}
}
private static class PartnerIdByNameAccessors implements PartnerAccessors<Integer> {
private final Schema partnerSchema;
private PartnerIdByNameAccessors(Schema partnerSchema) {
this.partnerSchema = partnerSchema;
}
@Override
public Integer fieldPartner(Integer partnerFieldId, int fieldId, String name) {
Types.StructType struct;
if (partnerFieldId == -1) {
struct = partnerSchema.asStruct();
} else {
struct = partnerSchema.findField(partnerFieldId).type().asStructType();
}
Types.NestedField field = struct.field(name);
if (field != null) {
return field.fieldId();
}
return null;
}
@Override
public Integer mapKeyPartner(Integer partnerMapId) {
Types.NestedField mapField = partnerSchema.findField(partnerMapId);
if (mapField != null) {
return mapField.type().asMapType().fields().get(0).fieldId();
}
return null;
}
@Override
public Integer mapValuePartner(Integer partnerMapId) {
Types.NestedField mapField = partnerSchema.findField(partnerMapId);
if (mapField != null) {
return mapField.type().asMapType().fields().get(1).fieldId();
}
return null;
}
@Override
public Integer listElementPartner(Integer partnerListId) {
Types.NestedField listField = partnerSchema.findField(partnerListId);
if (listField != null) {
return listField.type().asListType().fields().get(0).fieldId();
}
return null;
}
}
}