blob: 99058bc40096a4dba7d316990b952482ed9816aa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.pig;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.Types;
import org.apache.pig.LoadPushDown.RequiredField;
import org.apache.pig.LoadPushDown.RequiredFieldList;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
import org.apache.parquet.schema.ConversionPatterns;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Type.Repetition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Optional.of;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
/**
* Converts a Pig Schema into a Parquet schema
*
* Bags are converted into an optional group containing one repeated group field to preserve distinction between empty bag and null.
* Map are converted into an optional group containing one repeated group field of (key, value).
* anonymous fields are named field_{index}. (in most cases pig already gives them an alias val_{int}, so this rarely happens)
*/
public class PigSchemaConverter {
private static final Logger LOG = LoggerFactory.getLogger(PigSchemaConverter.class);
private static final String MAP_REPEATED_NAME = "key_value";
static final String ARRAY_VALUE_NAME = "value";
private ColumnAccess columnAccess;
public PigSchemaConverter() {
this(false);
}
/**
*
* @param columnIndexAccess toggle between name and index based access (default: false)
*/
public PigSchemaConverter(boolean columnIndexAccess) {
this.columnAccess = columnIndexAccess?new ColumnIndexAccess():new ColumnNameAccess();
}
/**
* @param pigSchemaString the pig schema to parse
* @return the parsed pig schema
*/
public static Schema parsePigSchema(String pigSchemaString) {
try {
return pigSchemaString == null ? null : Utils.getSchemaFromString(pigSchemaString);
} catch (ParserException e) {
throw new SchemaConversionException("could not parse Pig schema: " + pigSchemaString, e);
}
}
interface ColumnAccess {
List<Type> filterTupleSchema(GroupType schemaToFilter, Schema pigSchema, RequiredFieldList requiredFieldsList);
}
class ColumnIndexAccess implements ColumnAccess {
@Override
public List<Type> filterTupleSchema(GroupType schemaToFilter, Schema pigSchema, RequiredFieldList requiredFieldsList) {
List<Type> newFields = new ArrayList<Type>();
List<Pair<FieldSchema,Integer>> indexedFields = new ArrayList<Pair<FieldSchema,Integer>>();
try {
if(requiredFieldsList == null) {
int index = 0;
for(FieldSchema fs : pigSchema.getFields()) {
indexedFields.add(new Pair<FieldSchema, Integer>(fs, index++));
}
} else {
for(RequiredField rf : requiredFieldsList.getFields()) {
indexedFields.add(new Pair<FieldSchema, Integer>(pigSchema.getField(rf.getAlias()), rf.getIndex()));
}
}
for (Pair<FieldSchema, Integer> p : indexedFields) {
FieldSchema fieldSchema = pigSchema.getField(p.first.alias);
if (p.second < schemaToFilter.getFieldCount()) {
Type type = schemaToFilter.getFields().get(p.second);
newFields.add(filter(type, fieldSchema));
}
}
} catch (FrontendException e) {
throw new RuntimeException("Failed to filter requested fields", e);
}
return newFields;
}
}
class ColumnNameAccess implements ColumnAccess {
@Override
public List<Type> filterTupleSchema(GroupType schemaToFilter, Schema requestedPigSchema, RequiredFieldList requiredFieldsList) {
List<FieldSchema> fields = requestedPigSchema.getFields();
List<Type> newFields = new ArrayList<Type>();
for (int i = 0; i < fields.size(); i++) {
FieldSchema fieldSchema = fields.get(i);
String name = name(fieldSchema.alias, "field_"+i);
if (schemaToFilter.containsField(name)) {
newFields.add(filter(schemaToFilter.getType(name), fieldSchema));
}
}
return newFields;
}
}
/**
* @param pigSchema the pig schema to turn into a string representation
* @return the sctring representation of the schema
*/
static String pigSchemaToString(Schema pigSchema) {
final String pigSchemaString = pigSchema.toString();
return pigSchemaString.substring(1, pigSchemaString.length() - 1);
}
public static RequiredFieldList deserializeRequiredFieldList(String requiredFieldString) {
if(requiredFieldString == null) {
return null;
}
try {
return (RequiredFieldList) ObjectSerializer.deserialize(requiredFieldString);
} catch (IOException e) {
throw new RuntimeException("Failed to deserialize pushProjection", e);
}
}
static String serializeRequiredFieldList(RequiredFieldList requiredFieldList) {
try {
return ObjectSerializer.serialize(requiredFieldList);
} catch (IOException e) {
throw new RuntimeException("Failed to searlize required fields.", e);
}
}
/**
* converts a parquet schema into a pig schema
* @param parquetSchema the parquet schema to convert to Pig schema
* @return the resulting schema
*/
public Schema convert(MessageType parquetSchema) {
return convertFields(parquetSchema.getFields());
}
/**
* @param parquetType the type to convert
* @return the resulting schema (containing one field)
*/
public Schema convertField(Type parquetType) {
return convertFields(Arrays.asList(parquetType));
}
private Schema convertFields(List<Type> parquetFields) {
List<FieldSchema> fields = new ArrayList<Schema.FieldSchema>();
for (Type parquetType : parquetFields) {
try{
FieldSchema innerfieldSchema = getFieldSchema(parquetType);
if (parquetType.isRepetition(Repetition.REPEATED)) {
Schema bagSchema = new Schema(Arrays.asList(innerfieldSchema));
fields.add(new FieldSchema(null, bagSchema, DataType.BAG));
} else {
fields.add(innerfieldSchema);
}
}
catch (FrontendException fe) {
throw new SchemaConversionException("can't convert "+ parquetType, fe);
}
}
return new Schema(fields);
}
private FieldSchema getSimpleFieldSchema(final String fieldName, Type parquetType)
throws FrontendException {
final PrimitiveTypeName parquetPrimitiveTypeName =
parquetType.asPrimitiveType().getPrimitiveTypeName();
final LogicalTypeAnnotation logicalTypeAnnotation = parquetType.getLogicalTypeAnnotation();
return parquetPrimitiveTypeName.convert(
new PrimitiveTypeNameConverter<Schema.FieldSchema, FrontendException>() {
@Override
public FieldSchema convertFLOAT(PrimitiveTypeName primitiveTypeName)
throws FrontendException {
return new FieldSchema(fieldName, null, DataType.FLOAT);
}
@Override
public FieldSchema convertDOUBLE(PrimitiveTypeName primitiveTypeName)
throws FrontendException {
return new FieldSchema(fieldName, null, DataType.DOUBLE);
}
@Override
public FieldSchema convertINT32(PrimitiveTypeName primitiveTypeName)
throws FrontendException {
return new FieldSchema(fieldName, null, DataType.INTEGER);
}
@Override
public FieldSchema convertINT64(PrimitiveTypeName primitiveTypeName)
throws FrontendException {
return new FieldSchema(fieldName, null, DataType.LONG);
}
@Override
public FieldSchema convertINT96(PrimitiveTypeName primitiveTypeName)
throws FrontendException {
LOG.warn("Converting type " + primitiveTypeName + " to bytearray");
return new FieldSchema(fieldName, null, DataType.BYTEARRAY);
}
@Override
public FieldSchema convertFIXED_LEN_BYTE_ARRAY(
PrimitiveTypeName primitiveTypeName) throws FrontendException {
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
return new FieldSchema(fieldName, null, DataType.BIGDECIMAL);
} else {
return new FieldSchema(fieldName, null, DataType.BYTEARRAY);
}
}
@Override
public FieldSchema convertBOOLEAN(PrimitiveTypeName primitiveTypeName)
throws FrontendException {
return new FieldSchema(fieldName, null, DataType.BOOLEAN);
}
@Override
public FieldSchema convertBINARY(PrimitiveTypeName primitiveTypeName)
throws FrontendException {
if (logicalTypeAnnotation instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
return new FieldSchema(fieldName, null, DataType.CHARARRAY);
} else {
return new FieldSchema(fieldName, null, DataType.BYTEARRAY);
}
}
});
}
/*
* RuntimeException class to workaround throwing checked FrontendException in logical type visitors.
* Wrap the FrontendException inside the visitor in an inner catch block, and rethrow it outside of the visitor
*/
private static final class FrontendExceptionWrapper extends RuntimeException {
final FrontendException frontendException;
FrontendExceptionWrapper(FrontendException frontendException) {
this.frontendException = frontendException;
}
}
private FieldSchema getComplexFieldSchema(String fieldName, Type parquetType)
throws FrontendException {
GroupType parquetGroupType = parquetType.asGroupType();
LogicalTypeAnnotation logicalTypeAnnotation = parquetGroupType.getLogicalTypeAnnotation();
if (logicalTypeAnnotation != null) {
try {
return logicalTypeAnnotation.accept(new LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<FieldSchema>() {
@Override
public Optional<FieldSchema> visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation mapLogicalType) {
try {
// verify that its a map
if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) {
throw new SchemaConversionException("Invalid map type " + parquetGroupType);
}
GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
if (!mapKeyValType.isRepetition(Repetition.REPEATED) ||
(mapKeyValType.getLogicalTypeAnnotation() != null && !mapKeyValType.getLogicalTypeAnnotation().equals(LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance())) ||
mapKeyValType.getFieldCount() != 2) {
throw new SchemaConversionException("Invalid map type " + parquetGroupType);
}
// if value is not primitive wrap it in a tuple
Type valueType = mapKeyValType.getType(1);
Schema s = convertField(valueType);
s.getField(0).alias = null;
return of(new FieldSchema(fieldName, s, DataType.MAP));
} catch (FrontendException e) {
throw new FrontendExceptionWrapper(e);
}
}
@Override
public Optional<FieldSchema> visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation listLogicalType) {
try {
Type type = parquetGroupType.getType(0);
if (parquetGroupType.getFieldCount() != 1 || type.isPrimitive()) {
// an array is effectively a bag
Schema primitiveSchema = new Schema(getSimpleFieldSchema(parquetGroupType.getFieldName(0), type));
Schema tupleSchema = new Schema(new FieldSchema(ARRAY_VALUE_NAME, primitiveSchema, DataType.TUPLE));
return of(new FieldSchema(fieldName, tupleSchema, DataType.BAG));
}
GroupType tupleType = parquetGroupType.getType(0).asGroupType();
if (!tupleType.isRepetition(Repetition.REPEATED)) {
throw new SchemaConversionException("Invalid list type " + parquetGroupType);
}
Schema tupleSchema = new Schema(new FieldSchema(tupleType.getName(), convertFields(tupleType.getFields()), DataType.TUPLE));
return of(new FieldSchema(fieldName, tupleSchema, DataType.BAG));
} catch (FrontendException e) {
throw new FrontendExceptionWrapper(e);
}
}
}).orElseThrow(() -> new SchemaConversionException("Unexpected original type for " + parquetType + ": " + logicalTypeAnnotation));
} catch (FrontendExceptionWrapper e) {
throw e.frontendException;
}
} else {
// if original type is not set, we assume it to be tuple
return new FieldSchema(fieldName, convertFields(parquetGroupType.getFields()), DataType.TUPLE);
}
}
private FieldSchema getFieldSchema(Type parquetType) throws FrontendException {
final String fieldName = parquetType.getName();
if (parquetType.isPrimitive()) {
return getSimpleFieldSchema(fieldName, parquetType);
} else {
return getComplexFieldSchema(fieldName, parquetType);
}
}
/**
*
* @param pigSchema the pig schema
* @return the resulting Parquet schema
*/
public MessageType convert(Schema pigSchema) {
return new MessageType("pig_schema", convertTypes(pigSchema));
}
private Type[] convertTypes(Schema pigSchema) {
List<FieldSchema> fields = pigSchema.getFields();
Type[] types = new Type[fields.size()];
for (int i = 0; i < types.length; i++) {
types[i] = convert(fields.get(i), i);
}
return types;
}
private Type convert(FieldSchema fieldSchema, String defaultAlias) {
String name = name(fieldSchema.alias, defaultAlias);
return convertWithName(fieldSchema, name);
}
private Type convertWithName(FieldSchema fieldSchema, String name) {
try {
switch (fieldSchema.type) {
case DataType.BAG:
return convertBag(name, fieldSchema);
case DataType.TUPLE:
return convertTuple(name, fieldSchema, Repetition.OPTIONAL);
case DataType.MAP:
return convertMap(name, fieldSchema);
case DataType.BOOLEAN:
return primitive(name, PrimitiveTypeName.BOOLEAN);
case DataType.CHARARRAY:
return primitive(name, PrimitiveTypeName.BINARY, stringType());
case DataType.INTEGER:
return primitive(name, PrimitiveTypeName.INT32);
case DataType.LONG:
return primitive(name, PrimitiveTypeName.INT64);
case DataType.FLOAT:
return primitive(name, PrimitiveTypeName.FLOAT);
case DataType.DOUBLE:
return primitive(name, PrimitiveTypeName.DOUBLE);
case DataType.DATETIME:
throw new UnsupportedOperationException();
case DataType.BYTEARRAY:
return primitive(name, PrimitiveTypeName.BINARY);
default:
throw new SchemaConversionException("Unknown type " + fieldSchema.type + " " + DataType.findTypeName(fieldSchema.type));
}
} catch (FrontendException e) {
throw new SchemaConversionException("can't convert "+fieldSchema, e);
}
}
private Type convert(FieldSchema fieldSchema, int index) {
return convert(fieldSchema, "field_"+index);
}
/**
*
* @param name
* @param fieldSchema
* @return an optional group containing one repeated group field
* @throws FrontendException
*/
private GroupType convertBag(String name, FieldSchema fieldSchema) throws FrontendException {
FieldSchema innerField = fieldSchema.schema.getField(0);
return ConversionPatterns.listType(
Repetition.OPTIONAL,
name,
convertTuple(name(innerField.alias, "bag"), innerField, Repetition.REPEATED));
}
private String name(String fieldAlias, String defaultName) {
return fieldAlias == null ? defaultName : fieldAlias;
}
private Type primitive(String name, PrimitiveTypeName primitive, LogicalTypeAnnotation logicalTypeAnnotation) {
return Types.primitive(primitive, Repetition.OPTIONAL).as(logicalTypeAnnotation).named(name);
}
private PrimitiveType primitive(String name, PrimitiveTypeName primitive) {
return Types.primitive(primitive, Repetition.OPTIONAL).named(name);
}
/**
*
* @param alias
* @param fieldSchema
* @return an optional group containing one repeated group field (key, value)
* @throws FrontendException
*/
private GroupType convertMap(String alias, FieldSchema fieldSchema) {
Schema innerSchema = fieldSchema.schema;
if (innerSchema == null || innerSchema.size() != 1) {
throw new SchemaConversionException("Invalid map Schema, schema should contain exactly one field: " + fieldSchema);
}
FieldSchema innerField = null;
try {
innerField = innerSchema.getField(0);
} catch (FrontendException fe) {
throw new SchemaConversionException("Invalid map schema, cannot infer innerschema: ", fe);
}
Type convertedValue = convertWithName(innerField, "value");
return ConversionPatterns.stringKeyMapType(Repetition.OPTIONAL, alias, name(innerField.alias, MAP_REPEATED_NAME),
convertedValue);
}
private GroupType convertTuple(String alias, FieldSchema field, Repetition repetition) {
return new GroupType(repetition, alias, convertTypes(field.schema));
}
/**
* filters a Parquet schema based on a pig schema for projection
* @param schemaToFilter the schema to be filter
* @param requestedPigSchema the pig schema to filter it with
* @return the resulting filtered schema
*/
public MessageType filter(MessageType schemaToFilter, Schema requestedPigSchema) {
return filter(schemaToFilter, requestedPigSchema, null);
}
/**
* filters a Parquet schema based on a pig schema for projection
* @param schemaToFilter the schema to be filter
* @param requestedPigSchema the pig schema to filter it with
* @param requiredFieldList projected required fields
* @return the resulting filtered schema
*/
public MessageType filter(MessageType schemaToFilter, Schema requestedPigSchema, RequiredFieldList requiredFieldList) {
try {
if (LOG.isDebugEnabled()) LOG.debug("filtering schema:\n" + schemaToFilter + "\nwith requested pig schema:\n " + requestedPigSchema);
List<Type> result = columnAccess.filterTupleSchema(schemaToFilter, requestedPigSchema, requiredFieldList);
if (LOG.isDebugEnabled()) LOG.debug("schema:\n" + schemaToFilter + "\nfiltered to:\n" + result);
return new MessageType(schemaToFilter.getName(), result);
} catch (RuntimeException e) {
throw new RuntimeException("can't filter " + schemaToFilter + " with " + requestedPigSchema, e);
}
}
private Type filter(Type type, FieldSchema fieldSchema) {
if (LOG.isDebugEnabled()) LOG.debug("filtering type:\n" + type + "\nwith:\n " + fieldSchema);
try {
switch (fieldSchema.type) {
case DataType.BAG:
return filterBag(type.asGroupType(), fieldSchema);
case DataType.MAP:
return filterMap(type.asGroupType(), fieldSchema);
case DataType.TUPLE:
return filterTuple(type.asGroupType(), fieldSchema);
default:
return type;
}
} catch (FrontendException e) {
throw new SchemaConversionException("can't filter " + type + " with " + fieldSchema, e);
} catch (RuntimeException e) {
throw new RuntimeException("can't filter " + type + " with " + fieldSchema, e);
}
}
private Type filterTuple(GroupType tupleType, FieldSchema tupleFieldSchema) throws FrontendException {
if (LOG.isDebugEnabled()) LOG.debug("filtering TUPLE schema:\n" + tupleType + "\nwith:\n " + tupleFieldSchema);
return tupleType.withNewFields(columnAccess.filterTupleSchema(tupleType, tupleFieldSchema.schema, null));
}
private Type filterMap(GroupType mapType, FieldSchema mapFieldSchema) throws FrontendException {
if (LOG.isDebugEnabled()) LOG.debug("filtering MAP schema:\n" + mapType + "\nwith:\n " + mapFieldSchema);
if (mapType.getFieldCount() != 1) {
throw new RuntimeException("not unwrapping the right type, this should be a Map: " + mapType);
}
GroupType nested = mapType.getType(0).asGroupType();
if (nested.getFieldCount() != 2) {
throw new RuntimeException("this should be a Map Key/Value: " + mapType);
}
FieldSchema innerField = mapFieldSchema.schema.getField(0);
return mapType.withNewFields(nested.withNewFields(nested.getType(0), filter(nested.getType(1), innerField)));
}
private Type filterBag(GroupType bagType, FieldSchema bagFieldSchema) throws FrontendException {
if (LOG.isDebugEnabled()) LOG.debug("filtering BAG schema:\n" + bagType + "\nwith:\n " + bagFieldSchema);
if (bagType.getFieldCount() != 1) {
throw new RuntimeException("not unwrapping the right type, this should be a Bag: " + bagType);
}
Type nested = bagType.getType(0);
FieldSchema innerField = bagFieldSchema.schema.getField(0);
if (nested.isPrimitive() || nested.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.MapLogicalTypeAnnotation
|| nested.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
// Bags always contain tuples => we skip the extra tuple that was inserted in that case.
innerField = innerField.schema.getField(0);
}
return bagType.withNewFields(filter(nested, innerField));
}
}