| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iceberg.orc; |
| |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.stream.Collectors; |
| import org.apache.iceberg.Schema; |
| import org.apache.iceberg.mapping.NameMapping; |
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; |
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMultimap; |
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; |
| import org.apache.iceberg.types.Type; |
| import org.apache.iceberg.types.TypeUtil; |
| import org.apache.iceberg.types.Types; |
| import org.apache.orc.TypeDescription; |
| |
| /** |
| * Utilities for mapping Iceberg to ORC schemas. |
| */ |
| public final class ORCSchemaUtil { |
| |
| public enum BinaryType { |
| UUID, FIXED, BINARY |
| } |
| |
| public enum LongType { |
| TIME, LONG |
| } |
| |
| private static class OrcField { |
| private final String name; |
| private final TypeDescription type; |
| |
| OrcField(String name, TypeDescription type) { |
| this.name = name; |
| this.type = type; |
| } |
| |
| public String name() { |
| return name; |
| } |
| |
| public TypeDescription type() { |
| return type; |
| } |
| } |
| |
| static final String ICEBERG_ID_ATTRIBUTE = "iceberg.id"; |
| static final String ICEBERG_REQUIRED_ATTRIBUTE = "iceberg.required"; |
| |
| /** |
| * The name of the ORC {@link TypeDescription} attribute indicating the Iceberg type corresponding to an |
| * ORC binary type. The values for this attribute are denoted in {@code BinaryType}. |
| */ |
| public static final String ICEBERG_BINARY_TYPE_ATTRIBUTE = "iceberg.binary-type"; |
| /** |
| * The name of the ORC {@link TypeDescription} attribute indicating the Iceberg type corresponding to an |
| * ORC long type. The values for this attribute are denoted in {@code LongType}. |
| */ |
| public static final String ICEBERG_LONG_TYPE_ATTRIBUTE = "iceberg.long-type"; |
| static final String ICEBERG_FIELD_LENGTH = "iceberg.length"; |
| |
| private static final ImmutableMultimap<Type.TypeID, TypeDescription.Category> TYPE_MAPPING = |
| ImmutableMultimap.<Type.TypeID, TypeDescription.Category>builder() |
| .put(Type.TypeID.BOOLEAN, TypeDescription.Category.BOOLEAN) |
| .put(Type.TypeID.INTEGER, TypeDescription.Category.BYTE) |
| .put(Type.TypeID.INTEGER, TypeDescription.Category.SHORT) |
| .put(Type.TypeID.INTEGER, TypeDescription.Category.INT) |
| .put(Type.TypeID.LONG, TypeDescription.Category.LONG) |
| .put(Type.TypeID.TIME, TypeDescription.Category.LONG) |
| .put(Type.TypeID.FLOAT, TypeDescription.Category.FLOAT) |
| .put(Type.TypeID.DOUBLE, TypeDescription.Category.DOUBLE) |
| .put(Type.TypeID.DATE, TypeDescription.Category.DATE) |
| .put(Type.TypeID.STRING, TypeDescription.Category.CHAR) |
| .put(Type.TypeID.STRING, TypeDescription.Category.VARCHAR) |
| .put(Type.TypeID.STRING, TypeDescription.Category.STRING) |
| .put(Type.TypeID.UUID, TypeDescription.Category.BINARY) |
| .put(Type.TypeID.FIXED, TypeDescription.Category.BINARY) |
| .put(Type.TypeID.BINARY, TypeDescription.Category.BINARY) |
| .put(Type.TypeID.DECIMAL, TypeDescription.Category.DECIMAL) |
| .build(); |
| |
| private ORCSchemaUtil() { |
| } |
| |
| public static TypeDescription convert(Schema schema) { |
| final TypeDescription root = TypeDescription.createStruct(); |
| final Types.StructType schemaRoot = schema.asStruct(); |
| for (Types.NestedField field : schemaRoot.asStructType().fields()) { |
| TypeDescription orcColumType = convert(field.fieldId(), field.type(), field.isRequired()); |
| root.addField(field.name(), orcColumType); |
| } |
| return root; |
| } |
| |
| private static TypeDescription convert(Integer fieldId, Type type, boolean isRequired) { |
| final TypeDescription orcType; |
| |
| switch (type.typeId()) { |
| case BOOLEAN: |
| orcType = TypeDescription.createBoolean(); |
| break; |
| case INTEGER: |
| orcType = TypeDescription.createInt(); |
| break; |
| case TIME: |
| orcType = TypeDescription.createLong(); |
| orcType.setAttribute(ICEBERG_LONG_TYPE_ATTRIBUTE, LongType.TIME.toString()); |
| break; |
| case LONG: |
| orcType = TypeDescription.createLong(); |
| orcType.setAttribute(ICEBERG_LONG_TYPE_ATTRIBUTE, LongType.LONG.toString()); |
| break; |
| case FLOAT: |
| orcType = TypeDescription.createFloat(); |
| break; |
| case DOUBLE: |
| orcType = TypeDescription.createDouble(); |
| break; |
| case DATE: |
| orcType = TypeDescription.createDate(); |
| break; |
| case TIMESTAMP: |
| Types.TimestampType tsType = (Types.TimestampType) type; |
| if (tsType.shouldAdjustToUTC()) { |
| orcType = TypeDescription.createTimestampInstant(); |
| } else { |
| orcType = TypeDescription.createTimestamp(); |
| } |
| break; |
| case STRING: |
| orcType = TypeDescription.createString(); |
| break; |
| case UUID: |
| orcType = TypeDescription.createBinary(); |
| orcType.setAttribute(ICEBERG_BINARY_TYPE_ATTRIBUTE, BinaryType.UUID.toString()); |
| break; |
| case FIXED: |
| orcType = TypeDescription.createBinary(); |
| orcType.setAttribute(ICEBERG_BINARY_TYPE_ATTRIBUTE, BinaryType.FIXED.toString()); |
| orcType.setAttribute(ICEBERG_FIELD_LENGTH, Integer.toString(((Types.FixedType) type).length())); |
| break; |
| case BINARY: |
| orcType = TypeDescription.createBinary(); |
| orcType.setAttribute(ICEBERG_BINARY_TYPE_ATTRIBUTE, BinaryType.BINARY.toString()); |
| break; |
| case DECIMAL: { |
| Types.DecimalType decimal = (Types.DecimalType) type; |
| orcType = TypeDescription.createDecimal() |
| .withScale(decimal.scale()) |
| .withPrecision(decimal.precision()); |
| break; |
| } |
| case STRUCT: { |
| orcType = TypeDescription.createStruct(); |
| for (Types.NestedField field : type.asStructType().fields()) { |
| TypeDescription childType = convert(field.fieldId(), field.type(), field.isRequired()); |
| orcType.addField(field.name(), childType); |
| } |
| break; |
| } |
| case LIST: { |
| Types.ListType list = (Types.ListType) type; |
| TypeDescription elementType = convert(list.elementId(), list.elementType(), |
| list.isElementRequired()); |
| orcType = TypeDescription.createList(elementType); |
| break; |
| } |
| case MAP: { |
| Types.MapType map = (Types.MapType) type; |
| TypeDescription keyType = convert(map.keyId(), map.keyType(), true); |
| TypeDescription valueType = convert(map.valueId(), map.valueType(), map.isValueRequired()); |
| orcType = TypeDescription.createMap(keyType, valueType); |
| break; |
| } |
| default: |
| throw new IllegalArgumentException("Unhandled type " + type.typeId()); |
| } |
| |
| // Set Iceberg column attributes for mapping |
| orcType.setAttribute(ICEBERG_ID_ATTRIBUTE, String.valueOf(fieldId)); |
| orcType.setAttribute(ICEBERG_REQUIRED_ATTRIBUTE, String.valueOf(isRequired)); |
| return orcType; |
| } |
| |
| /** |
| * Convert an ORC schema to an Iceberg schema. This method handles the convertion from the original |
| * Iceberg column mapping IDs if present in the ORC column attributes, otherwise, ORC columns with no |
| * Iceberg IDs will be ignored and skipped in the conversion. |
| * |
| * @return the Iceberg schema |
| * @throws IllegalArgumentException if ORC schema has no columns with Iceberg ID attributes |
| */ |
| public static Schema convert(TypeDescription orcSchema) { |
| List<TypeDescription> children = orcSchema.getChildren(); |
| List<String> childrenNames = orcSchema.getFieldNames(); |
| Preconditions.checkState(children.size() == childrenNames.size(), |
| "Error in ORC file, children fields and names do not match."); |
| |
| OrcToIcebergVisitor schemaConverter = new OrcToIcebergVisitor(); |
| List<Types.NestedField> fields = OrcToIcebergVisitor.visitSchema(orcSchema, schemaConverter).stream() |
| .filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList()); |
| |
| if (fields.size() == 0) { |
| throw new IllegalArgumentException("ORC schema does not contain Iceberg IDs"); |
| } |
| |
| return new Schema(fields); |
| } |
| |
| /** |
| * Converts an Iceberg schema to a corresponding ORC schema within the context of an existing |
| * ORC file schema. |
| * This method also handles schema evolution from the original ORC file schema |
| * to the given Iceberg schema. It builds the desired reader schema with the schema |
| * evolution rules and pass that down to the ORC reader, |
| * which would then use its schema evolution to map that to the writer’s schema. |
| * |
| * Example: |
| * <code> |
| * Iceberg writer ORC writer |
| * struct<a (1): int, b (2): string> struct<a: int, b: string> |
| * struct<a (1): struct<b (2): string, c (3): date>> struct<a: struct<b:string, c:date>> |
| * </code> |
| * |
| * Iceberg reader ORC reader |
| * <code> |
| * struct<a (2): string, c (3): date> struct<b: string, c: date> |
| * struct<aa (1): struct<cc (3): date, bb (2): string>> struct<a: struct<c:date, b:string>> |
| * </code> |
| * |
| * @param schema an Iceberg schema |
| * @param originalOrcSchema an existing ORC file schema |
| * @return the resulting ORC schema |
| */ |
| public static TypeDescription buildOrcProjection(Schema schema, |
| TypeDescription originalOrcSchema) { |
| final Map<Integer, OrcField> icebergToOrc = icebergToOrcMapping("root", originalOrcSchema); |
| return buildOrcProjection(Integer.MIN_VALUE, schema.asStruct(), true, icebergToOrc); |
| } |
| |
| private static TypeDescription buildOrcProjection(Integer fieldId, Type type, boolean isRequired, |
| Map<Integer, OrcField> mapping) { |
| final TypeDescription orcType; |
| |
| switch (type.typeId()) { |
| case STRUCT: |
| orcType = TypeDescription.createStruct(); |
| for (Types.NestedField nestedField : type.asStructType().fields()) { |
| // Using suffix _r to avoid potential underlying issues in ORC reader |
| // with reused column names between ORC and Iceberg; |
| // e.g. renaming column c -> d and adding new column d |
| String name = Optional.ofNullable(mapping.get(nestedField.fieldId())) |
| .map(OrcField::name) |
| .orElseGet(() -> nestedField.name() + "_r" + nestedField.fieldId()); |
| TypeDescription childType = buildOrcProjection(nestedField.fieldId(), nestedField.type(), |
| isRequired && nestedField.isRequired(), mapping); |
| orcType.addField(name, childType); |
| } |
| break; |
| case LIST: |
| Types.ListType list = (Types.ListType) type; |
| TypeDescription elementType = buildOrcProjection(list.elementId(), list.elementType(), |
| isRequired && list.isElementRequired(), mapping); |
| orcType = TypeDescription.createList(elementType); |
| break; |
| case MAP: |
| Types.MapType map = (Types.MapType) type; |
| TypeDescription keyType = buildOrcProjection(map.keyId(), map.keyType(), isRequired, mapping); |
| TypeDescription valueType = buildOrcProjection(map.valueId(), map.valueType(), |
| isRequired && map.isValueRequired(), mapping); |
| orcType = TypeDescription.createMap(keyType, valueType); |
| break; |
| default: |
| if (mapping.containsKey(fieldId)) { |
| TypeDescription originalType = mapping.get(fieldId).type(); |
| Optional<TypeDescription> promotedType = getPromotedType(type, originalType); |
| |
| if (promotedType.isPresent()) { |
| orcType = promotedType.get(); |
| } else { |
| Preconditions.checkArgument(isSameType(originalType, type), |
| "Can not promote %s type to %s", |
| originalType.getCategory(), type.typeId().name()); |
| orcType = originalType.clone(); |
| } |
| } else { |
| if (isRequired) { |
| throw new IllegalArgumentException( |
| String.format("Field %d of type %s is required and was not found.", fieldId, type.toString())); |
| } |
| |
| orcType = convert(fieldId, type, false); |
| } |
| } |
| orcType.setAttribute(ICEBERG_ID_ATTRIBUTE, fieldId.toString()); |
| return orcType; |
| } |
| |
| private static Map<Integer, OrcField> icebergToOrcMapping(String name, TypeDescription orcType) { |
| Map<Integer, OrcField> icebergToOrc = Maps.newHashMap(); |
| switch (orcType.getCategory()) { |
| case STRUCT: |
| List<String> childrenNames = orcType.getFieldNames(); |
| List<TypeDescription> children = orcType.getChildren(); |
| for (int i = 0; i < children.size(); i++) { |
| icebergToOrc.putAll(icebergToOrcMapping(childrenNames.get(i), children.get(i))); |
| } |
| break; |
| case LIST: |
| icebergToOrc.putAll(icebergToOrcMapping("element", orcType.getChildren().get(0))); |
| break; |
| case MAP: |
| icebergToOrc.putAll(icebergToOrcMapping("key", orcType.getChildren().get(0))); |
| icebergToOrc.putAll(icebergToOrcMapping("value", orcType.getChildren().get(1))); |
| break; |
| } |
| |
| if (orcType.getId() > 0) { |
| // Only add to non-root types. |
| icebergID(orcType) |
| .ifPresent(integer -> icebergToOrc.put(integer, new OrcField(name, orcType))); |
| } |
| |
| return icebergToOrc; |
| } |
| |
| |
| private static Optional<TypeDescription> getPromotedType(Type icebergType, |
| TypeDescription originalOrcType) { |
| TypeDescription promotedOrcType = null; |
| if (Type.TypeID.LONG.equals(icebergType.typeId()) && |
| TypeDescription.Category.INT.equals(originalOrcType.getCategory())) { |
| // Promote: int to long |
| promotedOrcType = TypeDescription.createLong(); |
| } else if (Type.TypeID.DOUBLE.equals(icebergType.typeId()) && |
| TypeDescription.Category.FLOAT.equals(originalOrcType.getCategory())) { |
| // Promote: float to double |
| promotedOrcType = TypeDescription.createDouble(); |
| } else if (Type.TypeID.DECIMAL.equals(icebergType.typeId()) && |
| TypeDescription.Category.DECIMAL.equals(originalOrcType.getCategory())) { |
| // Promote: decimal(P, S) to decimal(P', S) if P' > P |
| Types.DecimalType newDecimal = (Types.DecimalType) icebergType; |
| if (newDecimal.scale() == originalOrcType.getScale() && |
| newDecimal.precision() > originalOrcType.getPrecision()) { |
| promotedOrcType = TypeDescription.createDecimal() |
| .withScale(newDecimal.scale()) |
| .withPrecision(newDecimal.precision()); |
| } |
| } |
| return Optional.ofNullable(promotedOrcType); |
| } |
| |
| private static boolean isSameType(TypeDescription orcType, Type icebergType) { |
| if (icebergType.typeId() == Type.TypeID.TIMESTAMP) { |
| Types.TimestampType tsType = (Types.TimestampType) icebergType; |
| return Objects.equals( |
| tsType.shouldAdjustToUTC() ? TypeDescription.Category.TIMESTAMP_INSTANT : TypeDescription.Category.TIMESTAMP, |
| orcType.getCategory()); |
| } else { |
| return TYPE_MAPPING.containsEntry(icebergType.typeId(), orcType.getCategory()); |
| } |
| } |
| |
| static Optional<Integer> icebergID(TypeDescription orcType) { |
| return Optional.ofNullable(orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE)) |
| .map(Integer::parseInt); |
| } |
| |
| static int fieldId(TypeDescription orcType) { |
| String idStr = orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE); |
| Preconditions.checkNotNull(idStr, "Missing expected '%s' property", ICEBERG_ID_ATTRIBUTE); |
| return Integer.parseInt(idStr); |
| } |
| |
| static boolean isOptional(TypeDescription orcType) { |
| String isRequiredStr = orcType.getAttributeValue(ICEBERG_REQUIRED_ATTRIBUTE); |
| if (isRequiredStr != null) { |
| return !Boolean.parseBoolean(isRequiredStr); |
| } |
| return true; |
| } |
| |
| static TypeDescription removeIds(TypeDescription type) { |
| return OrcSchemaVisitor.visit(type, new RemoveIds()); |
| } |
| |
| static boolean hasIds(TypeDescription orcSchema) { |
| return OrcSchemaVisitor.visit(orcSchema, new HasIds()); |
| } |
| |
| static TypeDescription applyNameMapping(TypeDescription orcSchema, NameMapping nameMapping) { |
| return OrcSchemaVisitor.visit(orcSchema, new ApplyNameMapping(nameMapping)); |
| } |
| |
| /** |
| * Generates mapping from field IDs to ORC qualified names. See {@link IdToOrcName} for details. |
| */ |
| public static Map<Integer, String> idToOrcName(Schema schema) { |
| return TypeUtil.visit(schema, new IdToOrcName()); |
| } |
| } |