blob: dc55a34826f567947a235af10f5d9d7d232aa429 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.impala.analysis;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.LogicalTypeAnnotation.*;
import org.apache.impala.catalog.ArrayType;
import org.apache.impala.catalog.MapType;
import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.StructField;
import org.apache.impala.catalog.StructType;
import org.apache.impala.catalog.Type;
import org.apache.impala.common.AnalysisException;
import org.apache.impala.common.FileSystemUtil;
/**
* Provides extractParquetSchema() to extract a schema
* from a parquet file.
*
* Because Parquet's Java package changed between Parquet 1.5
* and 1.9, a second copy of this file, with "org.apache.parquet." replaced
* with "org.apache.org.apache.parquet." is generated by the build system.
*/
class ParquetHelper {
private final static String ERROR_MSG =
"Failed to convert Parquet type\n%s\nto an Impala %s type:\n%s\n";
/**
* Reads the first block from the given HDFS file and returns the Parquet schema.
* Throws Analysis exception for any failure, such as failing to read the file
* or failing to parse the contents.
*/
private static org.apache.parquet.schema.MessageType loadParquetSchema(Path pathToFile)
throws AnalysisException {
try {
FileSystem fs = pathToFile.getFileSystem(FileSystemUtil.getConfiguration());
if (!fs.isFile(pathToFile)) {
throw new AnalysisException("Cannot infer schema, path is not a file: " +
pathToFile);
}
} catch (IOException e) {
throw new AnalysisException("Failed to connect to filesystem:" + e);
} catch (IllegalArgumentException e) {
throw new AnalysisException(e.getMessage());
}
ParquetMetadata readFooter = null;
try {
readFooter = ParquetFileReader.readFooter(FileSystemUtil.getConfiguration(),
pathToFile);
} catch (FileNotFoundException e) {
throw new AnalysisException("File not found: " + e);
} catch (IOException e) {
throw new AnalysisException("Failed to open file as a parquet file: " + e);
} catch (RuntimeException e) {
// Parquet throws a generic RuntimeException when reading a non-parquet file
if (e.toString().contains("is not a Parquet file")) {
throw new AnalysisException("File is not a parquet file: " + pathToFile);
}
// otherwise, who knows what we caught, throw it back up
throw e;
}
return readFooter.getFileMetaData().getSchema();
}
/**
* Converts a "primitive" Parquet type to an Impala type.
* A primitive type is a non-nested type with no annotations.
*/
private static Type convertPrimitiveParquetType(org.apache.parquet.schema.Type parquetType)
throws AnalysisException {
Preconditions.checkState(parquetType.isPrimitive());
PrimitiveType prim = parquetType.asPrimitiveType();
switch (prim.getPrimitiveTypeName()) {
case BINARY: return Type.STRING;
case BOOLEAN: return Type.BOOLEAN;
case DOUBLE: return Type.DOUBLE;
case FIXED_LEN_BYTE_ARRAY:
throw new AnalysisException(
"Unsupported parquet type FIXED_LEN_BYTE_ARRAY for field " +
parquetType.getName());
case FLOAT: return Type.FLOAT;
case INT32: return Type.INT;
case INT64: return Type.BIGINT;
case INT96: return Type.TIMESTAMP;
default:
Preconditions.checkState(false, "Unexpected parquet primitive type: " +
prim.getPrimitiveTypeName());
return null;
}
}
/**
* Converts a Parquet group type to an Impala map Type. We support both standard
* Parquet map representations, as well as legacy. Legacy representations are handled
* according to this specification:
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1
*
* Standard representation of a map in Parquet:
* <optional | required> group <name> (MAP) { <-- outerGroup is pointing at this
* repeated group key_value {
* required <key-type> key;
* <optional | required> <value-type> value;
* }
* }
*/
private static MapType convertMap(org.apache.parquet.schema.GroupType outerGroup)
throws AnalysisException {
if (outerGroup.getFieldCount() != 1){
throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
"MAP", "The logical MAP type must have exactly 1 inner field."));
}
org.apache.parquet.schema.Type innerField = outerGroup.getType(0);
if (!innerField.isRepetition(org.apache.parquet.schema.Type.Repetition.REPEATED)){
throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
"MAP", "The logical MAP type must have a repeated inner field."));
}
if (innerField.isPrimitive()) {
throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
"MAP", "The inner field of the logical MAP type must be a group."));
}
org.apache.parquet.schema.GroupType innerGroup = innerField.asGroupType();
// It does not matter whether innerGroup has an annotation or not (for example it may
// be annotated with MAP_KEY_VALUE). We treat the case that innerGroup has an
// annotation and the case the innerGroup does not have an annotation the same.
if (innerGroup.getFieldCount() != 2) {
throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
"MAP", "The inner field of the logical MAP type must have exactly 2 fields."));
}
org.apache.parquet.schema.Type key = innerGroup.getType(0);
if (!key.getName().equals("key")) {
throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
"MAP", "The name of the first field of the inner field of the logical MAP " +
"type must be 'key'"));
}
if (!key.isPrimitive()) {
throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
"MAP", "The key type of the logical MAP type must be primitive."));
}
org.apache.parquet.schema.Type value = innerGroup.getType(1);
if (!value.getName().equals("value")) {
throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
"MAP", "The name of the second field of the inner field of the logical MAP " +
"type must be 'value'"));
}
return new MapType(convertParquetType(key), convertParquetType(value));
}
/**
* Converts a Parquet group type to an Impala struct Type.
*/
private static StructType convertStruct(org.apache.parquet.schema.GroupType outerGroup)
throws AnalysisException {
List<StructField> structFields = new ArrayList<>();
for (org.apache.parquet.schema.Type field: outerGroup.getFields()) {
StructField f = new StructField(field.getName(), convertParquetType(field));
structFields.add(f);
}
return new StructType(structFields);
}
/**
* Converts a Parquet group type to an Impala array Type. We can handle the standard
* representation, but also legacy representations for backwards compatibility.
* Legacy representations are handled according to this specification:
* https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules
*
* Standard representation of an array in Parquet:
* <optional | required> group <name> (LIST) { <-- outerGroup is pointing at this
* repeated group list {
* <optional | required> <element-type> element;
* }
* }
*/
private static ArrayType convertArray(org.apache.parquet.schema.GroupType outerGroup)
throws AnalysisException {
if (outerGroup.getFieldCount() != 1) {
throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
"LIST", "The logical LIST type must have exactly 1 inner field."));
}
org.apache.parquet.schema.Type innerField = outerGroup.getType(0);
if (!innerField.isRepetition(org.apache.parquet.schema.Type.Repetition.REPEATED)) {
throw new AnalysisException(String.format(ERROR_MSG, outerGroup.toString(),
"LIST", "The inner field of the logical LIST type must be repeated."));
}
if (innerField.isPrimitive() || innerField.getOriginalType() != null) {
// From the Parquet Spec:
// 1. If the repeated field is not a group then it's type is the element type.
//
// If innerField is a group, but originalType is not null, the element type is
// based on the logical type.
return new ArrayType(convertParquetType(innerField));
}
org.apache.parquet.schema.GroupType innerGroup = innerField.asGroupType();
if (innerGroup.getFieldCount() != 1) {
// From the Parquet Spec:
// 2. If the repeated field is a group with multiple fields, then it's type is a
// struct.
return new ArrayType(convertStruct(innerGroup));
}
return new ArrayType(convertParquetType(innerGroup.getType(0)));
}
/**
* Converts a "logical" Parquet type to an Impala column type.
* A Parquet type is considered logical when it has an annotation. The annotation is
* stored as a "OriginalType". The Parquet documentation refers to these as logical
* types, so we use that terminology here.
*/
private static Type convertLogicalParquetType(
org.apache.parquet.schema.Type parquetType) throws AnalysisException {
// The Parquet API is responsible for deducing logical type if only converted type
// is set.
LogicalTypeAnnotation logicalType = parquetType.getLogicalTypeAnnotation();
if (logicalType instanceof ListLogicalTypeAnnotation) {
return convertArray(parquetType.asGroupType());
}
if (logicalType instanceof MapLogicalTypeAnnotation
|| logicalType instanceof MapKeyValueTypeAnnotation) {
// MAP_KEY_VALUE annotation should not be used any more. However, according to the
// Parquet spec, some existing data incorrectly uses MAP_KEY_VALUE in place of MAP.
// For backward-compatibility, a group annotated with MAP_KEY_VALUE that is not
// contained by a MAP-annotated group should be handled as a MAP-annotated group.
return convertMap(parquetType.asGroupType());
}
PrimitiveType prim = parquetType.asPrimitiveType();
if (prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.BINARY &&
(logicalType instanceof StringLogicalTypeAnnotation
|| logicalType instanceof EnumLogicalTypeAnnotation)) {
// UTF8 is the type annotation Parquet uses for strings
// ENUM is the type annotation Parquet uses to indicate that
// the original data type, before conversion to parquet, had been enum.
// Applications which do not have enumerated types (e.g. Impala)
// should interpret it as a string.
// We check to make sure it applies to BINARY to avoid errors if there is a bad
// annotation.
return Type.STRING;
}
if (prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64 &&
logicalType instanceof TimestampLogicalTypeAnnotation) {
return Type.TIMESTAMP;
}
if (prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32 &&
logicalType instanceof DateLogicalTypeAnnotation) {
return Type.DATE;
}
if ((prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT32
|| prim.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT64)
&& logicalType instanceof IntLogicalTypeAnnotation
&& ((IntLogicalTypeAnnotation) logicalType).isSigned() == true) {
// Map signed integer types to an supported Impala column type
switch (((IntLogicalTypeAnnotation) logicalType).getBitWidth()) {
case 8: return Type.TINYINT;
case 16: return Type.SMALLINT;
case 32: return Type.INT;
case 64: return Type.BIGINT;
}
}
if (logicalType instanceof DecimalLogicalTypeAnnotation) {
DecimalLogicalTypeAnnotation decimal = (DecimalLogicalTypeAnnotation) logicalType;
return ScalarType.createDecimalType(decimal.getPrecision(), decimal.getScale());
}
throw new AnalysisException(
"Unsupported logical parquet type " + logicalType + " (primitive type is " +
prim.getPrimitiveTypeName().name() + ") for field " +
parquetType.getName());
}
/**
* Converts a Parquet type into an Impala type.
*/
private static Type convertParquetType(org.apache.parquet.schema.Type field)
throws AnalysisException {
Type type = null;
// TODO for 2.3: If a field is not annotated with LIST, it can still be sometimes
// interpreted as an array. The following 2 examples should be interpreted as an array
// of integers, but this is currently not done.
// 1. repeated int int_col;
// 2. required group int_arr {
// repeated group list {
// required int element;
// }
// }
if (field.getLogicalTypeAnnotation() != null) {
type = convertLogicalParquetType(field);
} else if (field.isPrimitive()) {
type = convertPrimitiveParquetType(field);
} else {
// If field is not primitive, it must be a struct.
type = convertStruct(field.asGroupType());
}
return type;
}
/**
* Parses a Parquet file stored in HDFS and returns the corresponding Impala schema.
* This fails with an analysis exception if any errors occur reading the file,
* parsing the Parquet schema, or if the Parquet types cannot be represented in Impala.
*/
static List<ColumnDef> extractParquetSchema(HdfsUri location)
throws AnalysisException {
org.apache.parquet.schema.MessageType parquetSchema = loadParquetSchema(location.getPath());
List<org.apache.parquet.schema.Type> fields = parquetSchema.getFields();
List<ColumnDef> schema = new ArrayList<>();
for (org.apache.parquet.schema.Type field: fields) {
Type type = convertParquetType(field);
Preconditions.checkNotNull(type);
String colName = field.getName();
Map<ColumnDef.Option, Object> option = new HashMap<>();
option.put(ColumnDef.Option.COMMENT, "Inferred from Parquet file.");
schema.add(new ColumnDef(colName, new TypeDef(type), option));
}
return schema;
}
}