| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| package org.apache.impala.service; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import org.apache.iceberg.hadoop.HadoopTables; |
| import org.apache.iceberg.Schema; |
| import org.apache.iceberg.types.Types; |
| import org.apache.impala.catalog.ArrayType; |
| import org.apache.impala.catalog.MapType; |
| import org.apache.impala.catalog.ScalarType; |
| import org.apache.impala.catalog.StructField; |
| import org.apache.impala.catalog.StructType; |
| import org.apache.impala.catalog.Type; |
| import org.apache.impala.common.ImpalaRuntimeException; |
| import org.apache.impala.thrift.TColumn; |
| import org.apache.impala.thrift.TCreateTableParams; |
| import org.apache.impala.util.IcebergUtil; |
| import org.apache.log4j.Logger; |
| |
| import com.google.common.base.Preconditions; |
| |
| /** |
| * This is a helper for the CatalogOpExecutor to provide Iceberg related DDL functionality |
| * such as creating and dropping tables from Iceberg. |
| */ |
| public class IcebergCatalogOpExecutor { |
| public static final Logger LOG = Logger.getLogger(IcebergCatalogOpExecutor.class); |
| |
| // Keep id increase for each thread |
| private static ThreadLocal<Integer> iThreadLocal = new ThreadLocal<>(); |
| |
| public static void createTable(String metadataLoc, TCreateTableParams params) |
| throws ImpalaRuntimeException { |
| // Each table id increase from zero |
| iThreadLocal.set(0); |
| HadoopTables tables = IcebergUtil.getHadoopTables(); |
| Schema schema = createIcebergSchema(params); |
| tables.create(schema, IcebergUtil.createIcebergPartition(schema, params), |
| metadataLoc); |
| LOG.info("Create iceberg table successful."); |
| } |
| |
| /** |
| * Transform a StructField to Iceberg NestedField |
| */ |
| private static Types.NestedField createIcebergNestedField(StructField structField) |
| throws ImpalaRuntimeException { |
| Preconditions.checkState(structField != null); |
| org.apache.iceberg.types.Type icebergType = createIcebergType(structField.getType()); |
| Types.NestedField filed = |
| Types.NestedField.required(getNextId(), structField.getName(), icebergType, |
| structField.getComment()); |
| return filed; |
| } |
| |
| /** |
| * Transform a TColumn to Iceberg NestedField |
| */ |
| private static Types.NestedField createIcebergNestedField(TColumn column) |
| throws ImpalaRuntimeException { |
| Type type = Type.fromThrift(column.getColumnType()); |
| Preconditions.checkState(type != null); |
| org.apache.iceberg.types.Type icebergType = createIcebergType(type); |
| Types.NestedField filed = |
| Types.NestedField.required(getNextId(), column.getColumnName(), icebergType, |
| column.getComment()); |
| return filed; |
| } |
| |
| /** |
| * Build iceberg schema by parameters. |
| */ |
| private static Schema createIcebergSchema(TCreateTableParams params) |
| throws ImpalaRuntimeException { |
| List<Types.NestedField> fields = new ArrayList<Types.NestedField>(); |
| for (TColumn column : params.getColumns()) { |
| fields.add(createIcebergNestedField(column)); |
| } |
| return new Schema(fields); |
| } |
| |
| /** |
| * Converts a given Impala catalog type to the Iceberg type, and |
| * id is necessary for each iceberg complex type |
| */ |
| public static org.apache.iceberg.types.Type createIcebergType(Type t) |
| throws ImpalaRuntimeException { |
| if (t.isScalarType()) { |
| ScalarType s = (ScalarType) t; |
| switch (s.getPrimitiveType()) { |
| case INT: |
| return Types.IntegerType.get(); |
| case BIGINT: |
| return Types.LongType.get(); |
| case BOOLEAN: |
| return Types.BooleanType.get(); |
| case STRING: |
| return Types.StringType.get(); |
| case DOUBLE: |
| return Types.DoubleType.get(); |
| case FLOAT: |
| return Types.FloatType.get(); |
| case TIMESTAMP: |
| return Types.TimestampType.withZone(); |
| case DECIMAL: |
| return Types.DecimalType.of(s.decimalPrecision(), s.decimalScale()); |
| case DATE: |
| return Types.DateType.get(); |
| case BINARY: |
| return Types.BinaryType.get(); |
| /* Fall through below */ |
| case INVALID_TYPE: |
| case NULL_TYPE: |
| case DATETIME: |
| case CHAR: |
| case TINYINT: |
| case SMALLINT: |
| case VARCHAR: |
| default: |
| throw new ImpalaRuntimeException(String.format( |
| "Type %s is not supported in Iceberg", s.toSql())); |
| } |
| } else if (t.isArrayType()) { |
| ArrayType arrayType = (ArrayType) t; |
| return Types.ListType.ofRequired(getNextId(), |
| createIcebergType(arrayType.getItemType())); |
| } else if (t.isMapType()) { |
| MapType mapType = (MapType) t; |
| return Types.MapType.ofRequired(getNextId(), getNextId(), |
| createIcebergType(mapType.getKeyType()), |
| createIcebergType(mapType.getValueType())); |
| } else if (t.isStructType()) { |
| StructType structType = (StructType) t; |
| List<Types.NestedField> nestedFields = new ArrayList<Types.NestedField>(); |
| List<StructField> structFields = structType.getFields(); |
| for (StructField structField : structFields) { |
| nestedFields.add(createIcebergNestedField(structField)); |
| } |
| return Types.StructType.of(nestedFields); |
| } else { |
| throw new ImpalaRuntimeException(String.format( |
| "Type %s is not supported in Iceberg", t.toSql())); |
| } |
| } |
| |
| private static int getNextId() { |
| int nextId = iThreadLocal.get(); |
| iThreadLocal.set(nextId+1); |
| return nextId; |
| } |
| } |