blob: 78d8b445892c3901f62d2c3eebac3f056779f796 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.util;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.hive.HiveSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.impala.analysis.IcebergPartitionField;
import org.apache.impala.analysis.IcebergPartitionSpec;
import org.apache.impala.analysis.IcebergPartitionTransform;
import org.apache.impala.catalog.ArrayType;
import org.apache.impala.catalog.Column;
import org.apache.impala.catalog.IcebergColumn;
import org.apache.impala.catalog.IcebergStructField;
import org.apache.impala.catalog.MapType;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.StructField;
import org.apache.impala.catalog.StructType;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.thrift.TColumn;
import org.apache.impala.thrift.TColumnType;
import org.apache.impala.thrift.TIcebergPartitionTransformType;
/**
* Utility class for converting between Iceberg and Impala schemas and types.
*/
public class IcebergSchemaConverter {
// The methods in this class are public and static, hence it's possible to invoke
// them from multiple threads. Hence we use this thread-local integer to generate
// unique field ids for each schema element. Please note that Iceberg only care about
// the uniqueness of the field ids, but they will be reassigned by Iceberg.
private static ThreadLocal<Integer> iThreadLocal = new ThreadLocal<Integer>() {
@Override
public Integer initialValue() {
return 0;
}
};
/**
* Transform iceberg type to impala type
*/
public static Type toImpalaType(org.apache.iceberg.types.Type t)
throws ImpalaRuntimeException {
switch (t.typeId()) {
case BOOLEAN:
return Type.BOOLEAN;
case INTEGER:
return Type.INT;
case LONG:
return Type.BIGINT;
case FLOAT:
return Type.FLOAT;
case DOUBLE:
return Type.DOUBLE;
case STRING:
return Type.STRING;
case DATE:
return Type.DATE;
case BINARY:
return Type.BINARY;
case TIMESTAMP:
return Type.TIMESTAMP;
case DECIMAL:
Types.DecimalType decimal = (Types.DecimalType) t;
return ScalarType.createDecimalType(decimal.precision(), decimal.scale());
case LIST: {
Types.ListType listType = (Types.ListType) t;
return new ArrayType(toImpalaType(listType.elementType()));
}
case MAP: {
Types.MapType mapType = (Types.MapType) t;
return new MapType(toImpalaType(mapType.keyType()),
toImpalaType(mapType.valueType()));
}
case STRUCT: {
Types.StructType structType = (Types.StructType) t;
List<StructField> structFields = new ArrayList<>();
List<Types.NestedField> nestedFields = structType.fields();
for (Types.NestedField nestedField : nestedFields) {
// Get field id from 'NestedField'.
structFields.add(new IcebergStructField(nestedField.name(),
toImpalaType(nestedField.type()), nestedField.doc(),
nestedField.fieldId()));
}
return new StructType(structFields);
}
default:
throw new ImpalaRuntimeException(String.format(
"Iceberg type '%s' is not supported in Impala", t.typeId()));
}
}
/**
* Converts Iceberg schema to a Hive schema.
*/
public static List<FieldSchema> convertToHiveSchema(Schema schema)
throws ImpalaRuntimeException {
List<FieldSchema> ret = new ArrayList<>();
for (Types.NestedField column : schema.columns()) {
Type colType = toImpalaType(column.type());
// Update sd cols by iceberg NestedField
ret.add(new FieldSchema(column.name().toLowerCase(), colType.toSql().toLowerCase(),
column.doc()));
}
return ret;
}
/**
* Converts Iceberg schema to an Impala schema.
*/
public static List<Column> convertToImpalaSchema(Schema schema)
throws ImpalaRuntimeException {
List<Column> ret = new ArrayList<>();
int pos = 0;
for (Types.NestedField column : schema.columns()) {
Type colType = toImpalaType(column.type());
int keyId = -1, valueId = -1;
if (colType.isMapType()) {
// Get real map key and value field id if this column is Map type.
Types.MapType mapType = (Types.MapType) column.type();
keyId = mapType.keyId();
valueId = mapType.valueId();
}
ret.add(new IcebergColumn(column.name(), colType, column.doc(), pos++,
column.fieldId(), keyId, valueId, column.isOptional()));
}
return ret;
}
public static Schema convertToIcebergSchema(Table table) {
List<FieldSchema> columns = Lists.newArrayList(table.getSd().getCols());
columns.addAll(table.getPartitionKeys());
return HiveSchemaUtil.convert(columns, false);
}
public static PartitionSpec createIcebergPartitionSpec(Table table,
Schema schema) {
PartitionSpec.Builder specBuilder = PartitionSpec.builderFor(schema);
for (FieldSchema partitionKey : table.getPartitionKeys()) {
specBuilder.identity(partitionKey.getName());
}
return specBuilder.build();
}
/**
* Generates Iceberg schema from given columns. It also assigns a unique 'field id' for
* each schema element, although Iceberg will reassign the ids.
*/
public static Schema genIcebergSchema(List<TColumn> columns)
throws ImpalaRuntimeException {
iThreadLocal.set(1);
List<Types.NestedField> fields = new ArrayList<Types.NestedField>();
for (TColumn column : columns) {
fields.add(createIcebergField(column));
}
return new Schema(fields);
}
/**
* Create iceberg field from TColumn
*/
private static Types.NestedField createIcebergField(TColumn column)
throws ImpalaRuntimeException{
org.apache.iceberg.types.Type icebergType = fromImpalaColumnType(
column.getColumnType());
if (column.isIs_nullable()) {
// Create 'optional' field for 'NULL' situation
return Types.NestedField.optional(
nextId(), column.getColumnName(), icebergType, column.getComment());
} else {
// Create 'required' field for 'NOT NULL' or default situation
return Types.NestedField.required(
nextId(), column.getColumnName(), icebergType, column.getComment());
}
}
public static org.apache.iceberg.types.Type fromImpalaColumnType(TColumnType colType)
throws ImpalaRuntimeException {
return fromImpalaType(Type.fromThrift(colType));
}
/**
* Transform impala type to iceberg type.
*/
public static org.apache.iceberg.types.Type fromImpalaType(Type t)
throws ImpalaRuntimeException {
if (t.isScalarType()) {
ScalarType st = (ScalarType) t;
switch (st.getPrimitiveType()) {
case BOOLEAN:
return Types.BooleanType.get();
case INT:
return Types.IntegerType.get();
case BIGINT:
return Types.LongType.get();
case FLOAT:
return Types.FloatType.get();
case DOUBLE:
return Types.DoubleType.get();
case STRING:
return Types.StringType.get();
case DATE:
return Types.DateType.get();
case BINARY:
return Types.BinaryType.get();
case TIMESTAMP:
// Impala TIMESTAMP has timestamp without time zone semantics.
return Types.TimestampType.withoutZone();
case DECIMAL:
return Types.DecimalType.of(st.decimalPrecision(), st.decimalScale());
default:
throw new ImpalaRuntimeException(String.format(
"Type %s is not supported in Iceberg", t.toSql()));
}
} else if (t.isArrayType()) {
ArrayType at = (ArrayType) t;
return Types.ListType.ofRequired(nextId(), fromImpalaType(at.getItemType()));
} else if (t.isMapType()) {
MapType mt = (MapType) t;
return Types.MapType.ofRequired(nextId(), nextId(),
fromImpalaType(mt.getKeyType()), fromImpalaType(mt.getValueType()));
} else if (t.isStructType()) {
StructType st = (StructType) t;
List<Types.NestedField> icebergFields = new ArrayList<>();
for (StructField field : st.getFields()) {
icebergFields.add(Types.NestedField.required(nextId(), field.getName(),
fromImpalaType(field.getType()), field.getComment()));
}
return Types.StructType.of(icebergFields);
} else {
throw new ImpalaRuntimeException(String.format(
"Type %s is not supported in Iceberg", t.toSql()));
}
}
private static int nextId() {
int nextId = iThreadLocal.get();
iThreadLocal.set(nextId+1);
return nextId;
}
}