| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hudi.hive.util; |
| |
| import org.apache.hudi.hive.HiveSyncConfig; |
| import org.apache.hudi.hive.HoodieHiveSyncException; |
| import org.apache.hudi.hive.SchemaDifference; |
| |
| import org.apache.log4j.LogManager; |
| import org.apache.log4j.Logger; |
| import org.apache.parquet.schema.DecimalMetadata; |
| import org.apache.parquet.schema.GroupType; |
| import org.apache.parquet.schema.MessageType; |
| import org.apache.parquet.schema.OriginalType; |
| import org.apache.parquet.schema.PrimitiveType; |
| import org.apache.parquet.schema.Type; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| /** |
| * Schema Utilities. |
| */ |
| public class HiveSchemaUtil { |
| |
| private static final Logger LOG = LogManager.getLogger(HiveSchemaUtil.class); |
| public static final String HIVE_ESCAPE_CHARACTER = "`"; |
| |
| /** |
| * Get the schema difference between the storage schema and hive table schema. |
| */ |
| public static SchemaDifference getSchemaDifference(MessageType storageSchema, Map<String, String> tableSchema, |
| List<String> partitionKeys) { |
| return getSchemaDifference(storageSchema, tableSchema, partitionKeys, false); |
| } |
| |
| public static SchemaDifference getSchemaDifference(MessageType storageSchema, Map<String, String> tableSchema, |
| List<String> partitionKeys, boolean supportTimestamp) { |
| Map<String, String> newTableSchema; |
| try { |
| newTableSchema = convertParquetSchemaToHiveSchema(storageSchema, supportTimestamp); |
| } catch (IOException e) { |
| throw new HoodieHiveSyncException("Failed to convert parquet schema to hive schema", e); |
| } |
| LOG.info("Getting schema difference for " + tableSchema + "\r\n\r\n" + newTableSchema); |
| SchemaDifference.Builder schemaDiffBuilder = SchemaDifference.newBuilder(storageSchema, tableSchema); |
| Set<String> tableColumns = new HashSet<>(); |
| |
| for (Map.Entry<String, String> field : tableSchema.entrySet()) { |
| String fieldName = field.getKey().toLowerCase(); |
| String tickSurroundedFieldName = tickSurround(fieldName); |
| if (!isFieldExistsInSchema(newTableSchema, tickSurroundedFieldName) && !partitionKeys.contains(fieldName)) { |
| schemaDiffBuilder.deleteTableColumn(fieldName); |
| } else { |
| // check type |
| String tableColumnType = field.getValue(); |
| if (!isFieldExistsInSchema(newTableSchema, tickSurroundedFieldName)) { |
| if (partitionKeys.contains(fieldName)) { |
| // Partition key does not have to be part of the storage schema |
| continue; |
| } |
| // We will log this and continue. Hive schema is a superset of all parquet schemas |
| LOG.warn("Ignoring table column " + fieldName + " as its not present in the parquet schema"); |
| continue; |
| } |
| tableColumnType = tableColumnType.replaceAll("\\s+", ""); |
| |
| String expectedType = getExpectedType(newTableSchema, tickSurroundedFieldName); |
| expectedType = expectedType.replaceAll("\\s+", ""); |
| expectedType = expectedType.replaceAll("`", ""); |
| |
| if (!tableColumnType.equalsIgnoreCase(expectedType)) { |
| // check for incremental queries, the schema type change is allowed as per evolution |
| // rules |
| if (!isSchemaTypeUpdateAllowed(tableColumnType, expectedType)) { |
| throw new HoodieHiveSyncException("Could not convert field Type from " + tableColumnType + " to " |
| + expectedType + " for field " + fieldName); |
| } |
| schemaDiffBuilder.updateTableColumn(fieldName, getExpectedType(newTableSchema, tickSurroundedFieldName)); |
| } |
| } |
| tableColumns.add(tickSurroundedFieldName); |
| } |
| |
| for (Map.Entry<String, String> entry : newTableSchema.entrySet()) { |
| if (!tableColumns.contains(entry.getKey().toLowerCase())) { |
| schemaDiffBuilder.addTableColumn(entry.getKey(), entry.getValue()); |
| } |
| } |
| LOG.info("Difference between schemas: " + schemaDiffBuilder.build().toString()); |
| |
| return schemaDiffBuilder.build(); |
| } |
| |
| private static String getExpectedType(Map<String, String> newTableSchema, String fieldName) { |
| for (Map.Entry<String, String> entry : newTableSchema.entrySet()) { |
| if (entry.getKey().toLowerCase().equals(fieldName)) { |
| return entry.getValue(); |
| } |
| } |
| return null; |
| } |
| |
| private static boolean isFieldExistsInSchema(Map<String, String> newTableSchema, String fieldName) { |
| for (String entry : newTableSchema.keySet()) { |
| if (entry.toLowerCase().equals(fieldName)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * Returns equivalent Hive table schema read from a parquet file. |
| * |
| * @param messageType : Parquet Schema |
| * @return : Hive Table schema read from parquet file MAP[String,String] |
| */ |
| private static Map<String, String> convertParquetSchemaToHiveSchema(MessageType messageType, boolean supportTimestamp) throws IOException { |
| Map<String, String> schema = new LinkedHashMap<>(); |
| List<Type> parquetFields = messageType.getFields(); |
| for (Type parquetType : parquetFields) { |
| StringBuilder result = new StringBuilder(); |
| String key = parquetType.getName(); |
| if (parquetType.isRepetition(Type.Repetition.REPEATED)) { |
| result.append(createHiveArray(parquetType, "", supportTimestamp)); |
| } else { |
| result.append(convertField(parquetType, supportTimestamp)); |
| } |
| |
| schema.put(hiveCompatibleFieldName(key, false), result.toString()); |
| } |
| return schema; |
| } |
| |
| /** |
| * Convert one field data type of parquet schema into an equivalent Hive schema. |
| * |
| * @param parquetType : Single paruet field |
| * @return : Equivalent sHive schema |
| */ |
| private static String convertField(final Type parquetType, boolean supportTimestamp) { |
| StringBuilder field = new StringBuilder(); |
| if (parquetType.isPrimitive()) { |
| final PrimitiveType.PrimitiveTypeName parquetPrimitiveTypeName = |
| parquetType.asPrimitiveType().getPrimitiveTypeName(); |
| final OriginalType originalType = parquetType.getOriginalType(); |
| if (originalType == OriginalType.DECIMAL) { |
| final DecimalMetadata decimalMetadata = parquetType.asPrimitiveType().getDecimalMetadata(); |
| return field.append("DECIMAL(").append(decimalMetadata.getPrecision()).append(" , ") |
| .append(decimalMetadata.getScale()).append(")").toString(); |
| } else if (originalType == OriginalType.DATE) { |
| return field.append("DATE").toString(); |
| } else if (supportTimestamp && originalType == OriginalType.TIMESTAMP_MICROS) { |
| return field.append("TIMESTAMP").toString(); |
| } |
| |
| // TODO - fix the method naming here |
| return parquetPrimitiveTypeName.convert(new PrimitiveType.PrimitiveTypeNameConverter<String, RuntimeException>() { |
| @Override |
| public String convertBOOLEAN(PrimitiveType.PrimitiveTypeName primitiveTypeName) { |
| return "boolean"; |
| } |
| |
| @Override |
| public String convertINT32(PrimitiveType.PrimitiveTypeName primitiveTypeName) { |
| return "int"; |
| } |
| |
| @Override |
| public String convertINT64(PrimitiveType.PrimitiveTypeName primitiveTypeName) { |
| return "bigint"; |
| } |
| |
| @Override |
| public String convertINT96(PrimitiveType.PrimitiveTypeName primitiveTypeName) { |
| return "timestamp-millis"; |
| } |
| |
| @Override |
| public String convertFLOAT(PrimitiveType.PrimitiveTypeName primitiveTypeName) { |
| return "float"; |
| } |
| |
| @Override |
| public String convertDOUBLE(PrimitiveType.PrimitiveTypeName primitiveTypeName) { |
| return "double"; |
| } |
| |
| @Override |
| public String convertFIXED_LEN_BYTE_ARRAY(PrimitiveType.PrimitiveTypeName primitiveTypeName) { |
| return "binary"; |
| } |
| |
| @Override |
| public String convertBINARY(PrimitiveType.PrimitiveTypeName primitiveTypeName) { |
| if (originalType == OriginalType.UTF8 || originalType == OriginalType.ENUM) { |
| return "string"; |
| } else { |
| return "binary"; |
| } |
| } |
| }); |
| } else { |
| GroupType parquetGroupType = parquetType.asGroupType(); |
| OriginalType originalType = parquetGroupType.getOriginalType(); |
| if (originalType != null) { |
| switch (originalType) { |
| case LIST: |
| if (parquetGroupType.getFieldCount() != 1) { |
| throw new UnsupportedOperationException("Invalid list type " + parquetGroupType); |
| } |
| Type elementType = parquetGroupType.getType(0); |
| if (!elementType.isRepetition(Type.Repetition.REPEATED)) { |
| throw new UnsupportedOperationException("Invalid list type " + parquetGroupType); |
| } |
| return createHiveArray(elementType, parquetGroupType.getName(), supportTimestamp); |
| case MAP: |
| if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) { |
| throw new UnsupportedOperationException("Invalid map type " + parquetGroupType); |
| } |
| GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType(); |
| if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED) |
| || !mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE) |
| || mapKeyValType.getFieldCount() != 2) { |
| throw new UnsupportedOperationException("Invalid map type " + parquetGroupType); |
| } |
| Type keyType = mapKeyValType.getType(0); |
| if (!keyType.isPrimitive() |
| || !keyType.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.BINARY) |
| || !keyType.getOriginalType().equals(OriginalType.UTF8)) { |
| throw new UnsupportedOperationException("Map key type must be binary (UTF8): " + keyType); |
| } |
| Type valueType = mapKeyValType.getType(1); |
| return createHiveMap(convertField(keyType, supportTimestamp), convertField(valueType, supportTimestamp)); |
| case ENUM: |
| case UTF8: |
| return "string"; |
| case MAP_KEY_VALUE: |
| // MAP_KEY_VALUE was supposed to be used to annotate key and |
| // value group levels in a |
| // MAP. However, that is always implied by the structure of |
| // MAP. Hence, PARQUET-113 |
| // dropped the requirement for having MAP_KEY_VALUE. |
| default: |
| throw new UnsupportedOperationException("Cannot convert Parquet type " + parquetType); |
| } |
| } else { |
| // if no original type then it's a record |
| return createHiveStruct(parquetGroupType.getFields(), supportTimestamp); |
| } |
| } |
| } |
| |
| /** |
| * Return a 'struct' Hive schema from a list of Parquet fields. |
| * |
| * @param parquetFields : list of parquet fields |
| * @return : Equivalent 'struct' Hive schema |
| */ |
| private static String createHiveStruct(List<Type> parquetFields, boolean supportTimestamp) { |
| StringBuilder struct = new StringBuilder(); |
| struct.append("STRUCT< "); |
| for (Type field : parquetFields) { |
| // TODO: struct field name is only translated to support special char($) |
| // We will need to extend it to other collection type |
| struct.append(hiveCompatibleFieldName(field.getName(), true)).append(" : "); |
| struct.append(convertField(field, supportTimestamp)).append(", "); |
| } |
| struct.delete(struct.length() - 2, struct.length()); // Remove the last |
| // ", " |
| struct.append(">"); |
| String finalStr = struct.toString(); |
| // Struct cannot have - in them. userstore_udr_entities has uuid in struct. This breaks the |
| // schema. |
| // HDrone sync should not fail because of this. |
| finalStr = finalStr.replaceAll("-", "_"); |
| return finalStr; |
| } |
| |
| private static String hiveCompatibleFieldName(String fieldName, boolean isNested) { |
| String result = fieldName; |
| if (isNested) { |
| result = ColumnNameXLator.translateNestedColumn(fieldName); |
| } |
| return tickSurround(result); |
| } |
| |
| private static String tickSurround(String result) { |
| if (!result.startsWith("`")) { |
| result = "`" + result; |
| } |
| if (!result.endsWith("`")) { |
| result = result + "`"; |
| } |
| return result; |
| } |
| |
| private static String removeSurroundingTick(String result) { |
| if (result.startsWith("`") && result.endsWith("`")) { |
| result = result.substring(1, result.length() - 1); |
| } |
| |
| return result; |
| } |
| |
| /** |
| * Create a 'Map' schema from Parquet map field. |
| */ |
| private static String createHiveMap(String keyType, String valueType) { |
| return "MAP< " + keyType + ", " + valueType + ">"; |
| } |
| |
| /** |
| * Create an Array Hive schema from equivalent parquet list type. |
| */ |
| private static String createHiveArray(Type elementType, String elementName, boolean supportTimestamp) { |
| StringBuilder array = new StringBuilder(); |
| array.append("ARRAY< "); |
| if (elementType.isPrimitive()) { |
| array.append(convertField(elementType, supportTimestamp)); |
| } else { |
| final GroupType groupType = elementType.asGroupType(); |
| final List<Type> groupFields = groupType.getFields(); |
| if (groupFields.size() > 1 || (groupFields.size() == 1 |
| && (elementType.getName().equals("array") || elementType.getName().equals(elementName + "_tuple")))) { |
| array.append(convertField(elementType, supportTimestamp)); |
| } else { |
| array.append(convertField(groupType.getFields().get(0), supportTimestamp)); |
| } |
| } |
| array.append(">"); |
| return array.toString(); |
| } |
| |
| public static boolean isSchemaTypeUpdateAllowed(String prevType, String newType) { |
| if (prevType == null || prevType.trim().isEmpty() || newType == null || newType.trim().isEmpty()) { |
| return false; |
| } |
| prevType = prevType.toLowerCase(); |
| newType = newType.toLowerCase(); |
| if (prevType.equals(newType)) { |
| return true; |
| } else if (prevType.equalsIgnoreCase("int") && newType.equalsIgnoreCase("bigint")) { |
| return true; |
| } else if (prevType.equalsIgnoreCase("float") && newType.equalsIgnoreCase("double")) { |
| return true; |
| } else { |
| return prevType.contains("struct") && newType.toLowerCase().contains("struct"); |
| } |
| } |
| |
| public static String generateSchemaString(MessageType storageSchema) throws IOException { |
| return generateSchemaString(storageSchema, Collections.EMPTY_LIST); |
| } |
| |
| public static String generateSchemaString(MessageType storageSchema, List<String> colsToSkip) throws IOException { |
| return generateSchemaString(storageSchema, colsToSkip, false); |
| } |
| |
| public static String generateSchemaString(MessageType storageSchema, List<String> colsToSkip, boolean supportTimestamp) throws IOException { |
| Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, supportTimestamp); |
| StringBuilder columns = new StringBuilder(); |
| for (Map.Entry<String, String> hiveSchemaEntry : hiveSchema.entrySet()) { |
| if (!colsToSkip.contains(removeSurroundingTick(hiveSchemaEntry.getKey()))) { |
| columns.append(hiveSchemaEntry.getKey()).append(" "); |
| columns.append(hiveSchemaEntry.getValue()).append(", "); |
| } |
| } |
| // Remove the last ", " |
| columns.delete(columns.length() - 2, columns.length()); |
| return columns.toString(); |
| } |
| |
| public static String generateCreateDDL(String tableName, MessageType storageSchema, HiveSyncConfig config, String inputFormatClass, |
| String outputFormatClass, String serdeClass, Map<String, String> serdeProperties, |
| Map<String, String> tableProperties) throws IOException { |
| Map<String, String> hiveSchema = convertParquetSchemaToHiveSchema(storageSchema, config.supportTimestamp); |
| String columns = generateSchemaString(storageSchema, config.partitionFields, config.supportTimestamp); |
| |
| List<String> partitionFields = new ArrayList<>(); |
| for (String partitionKey : config.partitionFields) { |
| String partitionKeyWithTicks = tickSurround(partitionKey); |
| partitionFields.add(new StringBuilder().append(partitionKeyWithTicks).append(" ") |
| .append(getPartitionKeyType(hiveSchema, partitionKeyWithTicks)).toString()); |
| } |
| |
| String partitionsStr = String.join(",", partitionFields); |
| StringBuilder sb = new StringBuilder("CREATE EXTERNAL TABLE IF NOT EXISTS "); |
| sb.append(HIVE_ESCAPE_CHARACTER).append(config.databaseName).append(HIVE_ESCAPE_CHARACTER) |
| .append(".").append(HIVE_ESCAPE_CHARACTER).append(tableName).append(HIVE_ESCAPE_CHARACTER); |
| sb.append("( ").append(columns).append(")"); |
| if (!config.partitionFields.isEmpty()) { |
| sb.append(" PARTITIONED BY (").append(partitionsStr).append(")"); |
| } |
| sb.append(" ROW FORMAT SERDE '").append(serdeClass).append("'"); |
| if (serdeProperties != null && !serdeProperties.isEmpty()) { |
| sb.append(" WITH SERDEPROPERTIES (").append(propertyToString(serdeProperties)).append(")"); |
| } |
| sb.append(" STORED AS INPUTFORMAT '").append(inputFormatClass).append("'"); |
| sb.append(" OUTPUTFORMAT '").append(outputFormatClass).append("' LOCATION '").append(config.basePath).append("'"); |
| |
| if (tableProperties != null && !tableProperties.isEmpty()) { |
| sb.append(" TBLPROPERTIES(").append(propertyToString(tableProperties)).append(")"); |
| } |
| return sb.toString(); |
| } |
| |
| private static String propertyToString(Map<String, String> properties) { |
| if (properties == null || properties.isEmpty()) { |
| return ""; |
| } |
| StringBuilder sb = new StringBuilder(); |
| boolean first = true; |
| for (Map.Entry<String, String> entry: properties.entrySet()) { |
| if (!first) { |
| sb.append(","); |
| } |
| sb.append("'").append(entry.getKey()).append("'='").append(entry.getValue()).append("'"); |
| first = false; |
| } |
| return sb.toString(); |
| } |
| |
| private static String getPartitionKeyType(Map<String, String> hiveSchema, String partitionKey) { |
| if (hiveSchema.containsKey(partitionKey)) { |
| return hiveSchema.get(partitionKey); |
| } |
| // Default the unknown partition fields to be String |
| // TODO - all partition fields should be part of the schema. datestr is treated as special. |
| // Dont do that |
| return "String"; |
| } |
| } |