blob: 50f9ebcc3df023136edcc531b32bbe35d95e7ebb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.pig;
import static org.apache.parquet.pig.PigSchemaConverter.parsePigSchema;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.LoadPushDown.RequiredFieldList;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.parquet.hadoop.api.InitContext;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.pig.convert.TupleRecordMaterializer;
import org.apache.parquet.schema.IncompatibleSchemaModificationException;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Read support for Pig Tuple
* a Pig MetaDataBlock is expected in the initialization call
*/
public class TupleReadSupport extends ReadSupport<Tuple> {
static final String PARQUET_PIG_SCHEMA = "parquet.pig.schema";
static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.private.pig.column.index.access";
static final String PARQUET_PIG_REQUIRED_FIELDS = "parquet.private.pig.required.fields";
static final String PARQUET_PIG_ELEPHANT_BIRD_COMPATIBLE = "parquet.pig.elephantbird.compatible";
private static final Logger LOG = LoggerFactory.getLogger(TupleReadSupport.class);
private static final PigSchemaConverter pigSchemaConverter = new PigSchemaConverter(false);
/**
* @param configuration the configuration for the current job
* @return the pig schema requested by the user or null if none.
*/
static Schema getPigSchema(Configuration configuration) {
return parsePigSchema(configuration.get(PARQUET_PIG_SCHEMA));
}
/**
* @param configuration configuration for the current job
* @return List of required fields from pushProjection
*/
static RequiredFieldList getRequiredFields(Configuration configuration) {
String requiredFieldString = configuration.get(PARQUET_PIG_REQUIRED_FIELDS);
if(requiredFieldString == null) {
return null;
}
try {
return (RequiredFieldList) ObjectSerializer.deserialize(requiredFieldString);
} catch (IOException iOException) {
throw new RuntimeException("Failed to deserialize pushProjection");
}
}
/**
* @param fileSchema the parquet schema from the file
* @param keyValueMetaData the extra meta data from the files
* @return the pig schema according to the file
*/
static Schema getPigSchemaFromMultipleFiles(MessageType fileSchema, Map<String, Set<String>> keyValueMetaData) {
Set<String> pigSchemas = PigMetaData.getPigSchemas(keyValueMetaData);
if (pigSchemas == null) {
return pigSchemaConverter.convert(fileSchema);
}
Schema mergedPigSchema = null;
for (String pigSchemaString : pigSchemas) {
try {
mergedPigSchema = union(mergedPigSchema, parsePigSchema(pigSchemaString));
} catch (FrontendException e) {
throw new ParquetDecodingException("can not merge " + pigSchemaString + " into " + mergedPigSchema, e);
}
}
return mergedPigSchema;
}
/**
* @param fileSchema the parquet schema from the file
* @param keyValueMetaData the extra meta data from the file
* @return the pig schema according to the file
*/
static Schema getPigSchemaFromFile(MessageType fileSchema, Map<String, String> keyValueMetaData) {
PigMetaData pigMetaData = PigMetaData.fromMetaData(keyValueMetaData);
if (pigMetaData == null) {
return pigSchemaConverter.convert(fileSchema);
}
return parsePigSchema(pigMetaData.getPigSchema());
}
private static Schema union(Schema merged, Schema pigSchema) throws FrontendException {
List<FieldSchema> fields = new ArrayList<Schema.FieldSchema>();
if (merged == null) {
return pigSchema;
}
// merging existing fields
for (FieldSchema fieldSchema : merged.getFields()) {
FieldSchema newFieldSchema = pigSchema.getField(fieldSchema.alias);
if (newFieldSchema == null) {
fields.add(fieldSchema);
} else {
fields.add(union(fieldSchema, newFieldSchema));
}
}
// adding new fields
for (FieldSchema newFieldSchema : pigSchema.getFields()) {
FieldSchema oldFieldSchema = merged.getField(newFieldSchema.alias);
if (oldFieldSchema == null) {
fields.add(newFieldSchema);
}
}
return new Schema(fields);
}
private static FieldSchema union(FieldSchema mergedFieldSchema, FieldSchema newFieldSchema) {
if (!mergedFieldSchema.alias.equals(newFieldSchema.alias)
|| mergedFieldSchema.type != newFieldSchema.type) {
throw new IncompatibleSchemaModificationException("Incompatible Pig schema change: " + mergedFieldSchema + " can not accept");
}
try {
return new FieldSchema(mergedFieldSchema.alias, union(mergedFieldSchema.schema, newFieldSchema.schema), mergedFieldSchema.type);
} catch (FrontendException e) {
throw new SchemaConversionException(e);
}
}
@Override
public ReadContext init(InitContext initContext) {
Schema pigSchema = getPigSchema(initContext.getConfiguration());
RequiredFieldList requiredFields = getRequiredFields(initContext.getConfiguration());
boolean columnIndexAccess = initContext.getConfiguration().getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
if (pigSchema == null) {
return new ReadContext(initContext.getFileSchema());
} else {
// project the file schema according to the requested Pig schema
MessageType parquetRequestedSchema = new PigSchemaConverter(columnIndexAccess).filter(initContext.getFileSchema(), pigSchema, requiredFields);
return new ReadContext(parquetRequestedSchema);
}
}
@Override
public RecordMaterializer<Tuple> prepareForRead(
Configuration configuration,
Map<String, String> keyValueMetaData,
MessageType fileSchema,
ReadContext readContext) {
MessageType requestedSchema = readContext.getRequestedSchema();
Schema requestedPigSchema = getPigSchema(configuration);
if (requestedPigSchema == null) {
throw new ParquetDecodingException("Missing Pig schema: ParquetLoader sets the schema in the job conf");
}
boolean elephantBirdCompatible = configuration.getBoolean(PARQUET_PIG_ELEPHANT_BIRD_COMPATIBLE, false);
boolean columnIndexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
if (elephantBirdCompatible) {
LOG.info("Numbers will default to 0 instead of NULL; Boolean will be converted to Int");
}
return new TupleRecordMaterializer(requestedSchema, requestedPigSchema, elephantBirdCompatible, columnIndexAccess);
}
}