blob: e13bc64dbe1658cf1d51c4010ebf28b4d0d3bc13 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.pig;
import static java.util.Arrays.asList;
import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.setInputPaths;
import static org.apache.parquet.hadoop.util.ContextUtil.getConfiguration;
import static org.apache.parquet.pig.PigSchemaConverter.parsePigSchema;
import static org.apache.parquet.pig.PigSchemaConverter.pigSchemaToString;
import static org.apache.parquet.pig.PigSchemaConverter.serializeRequiredFieldList;
import static org.apache.parquet.pig.TupleReadSupport.PARQUET_PIG_SCHEMA;
import static org.apache.parquet.pig.TupleReadSupport.PARQUET_PIG_REQUIRED_FIELDS;
import static org.apache.parquet.pig.TupleReadSupport.PARQUET_COLUMN_INDEX_ACCESS;
import static org.apache.parquet.pig.TupleReadSupport.getPigSchemaFromMultipleFiles;
import static org.apache.parquet.filter2.predicate.FilterApi.*;
import java.io.IOException;
import java.lang.ref.Reference;
import java.lang.ref.SoftReference;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
import org.apache.parquet.filter2.predicate.Operators;
import org.apache.parquet.io.api.Binary;
import org.apache.pig.Expression;
import org.apache.pig.Expression.BetweenExpression;
import org.apache.pig.Expression.InExpression;
import org.apache.pig.Expression.UnaryExpression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPredicatePushdown;
import org.apache.pig.LoadPushDown;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.parser.ParserException;
import static org.apache.pig.Expression.BinaryExpression;
import static org.apache.pig.Expression.Column;
import static org.apache.pig.Expression.Const;
import static org.apache.pig.Expression.OpType;
import org.apache.parquet.hadoop.ParquetInputFormat;
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
import org.apache.parquet.io.ParquetDecodingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Pig Loader for the Parquet file format.
*/
public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDown, LoadPredicatePushdown {
private static final Logger LOG = LoggerFactory.getLogger(ParquetLoader.class);
public static final String ENABLE_PREDICATE_FILTER_PUSHDOWN = "parquet.pig.predicate.pushdown.enable";
private static final boolean DEFAULT_PREDICATE_PUSHDOWN_ENABLED = false;
// Using a weak hash map will ensure that the cache will be gc'ed when there is memory pressure
static final Map<String, Reference<ParquetInputFormat<Tuple>>> inputFormatCache = new WeakHashMap<String, Reference<ParquetInputFormat<Tuple>>>();
private Schema requestedSchema;
private boolean columnIndexAccess;
private String location;
private boolean setLocationHasBeenCalled = false;
private RecordReader<Void, Tuple> reader;
private ParquetInputFormat<Tuple> parquetInputFormat;
private Schema schema;
private RequiredFieldList requiredFieldList = null;
protected String signature;
/**
* To read the content in its original schema
*/
public ParquetLoader() {
this(null);
}
/**
* To read only a subset of the columns in the file
* @param requestedSchemaStr a subset of the original pig schema in the file
*/
public ParquetLoader(String requestedSchemaStr) {
this(parsePigSchema(requestedSchemaStr), false);
}
/**
* To read only a subset of the columns in the file optionally assigned by
* column positions. Using column positions allows for renaming the fields
* and is more inline with the "schema-on-read" approach to accessing file
* data.
*
* Example:
* File Schema: 'c1:int, c2:float, c3:double, c4:long'
* ParquetLoader('n1:int, n2:float, n3:double, n4:long', 'true');
*
* This will use the names provided in the requested schema and assign them
* to column positions indicated by order.
*
* @param requestedSchemaStr a subset of the original pig schema in the file
* @param columnIndexAccess use column index positions as opposed to name (default: false)
*/
public ParquetLoader(String requestedSchemaStr, String columnIndexAccess) {
this(parsePigSchema(requestedSchemaStr), Boolean.parseBoolean(columnIndexAccess));
}
/**
* Use the provided schema to access the underlying file data.
*
* The same as the string based constructor but for programmatic use.
*
* @param requestedSchema a subset of the original pig schema in the file
* @param columnIndexAccess use column index positions as opposed to name (default: false)
*/
public ParquetLoader(Schema requestedSchema, boolean columnIndexAccess) {
this.requestedSchema = requestedSchema;
this.columnIndexAccess = columnIndexAccess;
}
@Override
public void setLocation(String location, Job job) throws IOException {
if (LOG.isDebugEnabled()) {
String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
LOG.debug("LoadFunc.setLocation({}, {})", location, jobToString);
}
setInput(location, job);
}
private void setInput(String location, Job job) throws IOException {
this.setLocationHasBeenCalled = true;
this.location = location;
setInputPaths(job, location);
//This is prior to load because the initial value comes from the constructor
//not file metadata or pig framework and would get overwritten in initSchema().
if(UDFContext.getUDFContext().isFrontend()) {
storeInUDFContext(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess));
}
schema = PigSchemaConverter.parsePigSchema(getPropertyFromUDFContext(PARQUET_PIG_SCHEMA));
requiredFieldList = PigSchemaConverter.deserializeRequiredFieldList(getPropertyFromUDFContext(PARQUET_PIG_REQUIRED_FIELDS));
columnIndexAccess = Boolean.parseBoolean(getPropertyFromUDFContext(PARQUET_COLUMN_INDEX_ACCESS));
initSchema(job);
if(UDFContext.getUDFContext().isFrontend()) {
//Setting for task-side loading via initSchema()
storeInUDFContext(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
storeInUDFContext(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
}
//Used by task-side loader via TupleReadSupport
getConfiguration(job).set(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
getConfiguration(job).set(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
getConfiguration(job).set(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess));
FilterPredicate filterPredicate = (FilterPredicate) getFromUDFContext(ParquetInputFormat.FILTER_PREDICATE);
if(filterPredicate != null) {
ParquetInputFormat.setFilterPredicate(getConfiguration(job), filterPredicate);
}
}
@Override
public InputFormat<Void, Tuple> getInputFormat() throws IOException {
LOG.debug("LoadFunc.getInputFormat()");
return getParquetInputFormat();
}
private void checkSetLocationHasBeenCalled() {
if (!setLocationHasBeenCalled) {
throw new IllegalStateException("setLocation() must be called first");
}
}
private static class UnregisteringParquetInputFormat extends ParquetInputFormat<Tuple> {
private final String location;
public UnregisteringParquetInputFormat(String location) {
super(TupleReadSupport.class);
this.location = location;
}
@Override
public RecordReader<Void, Tuple> createRecordReader(
InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
// for local mode we don't want to keep that around
inputFormatCache.remove(location);
return super.createRecordReader(inputSplit, taskAttemptContext);
}
};
private ParquetInputFormat<Tuple> getParquetInputFormat() throws ParserException {
checkSetLocationHasBeenCalled();
if (parquetInputFormat == null) {
// unfortunately Pig will create many Loaders, so we cache the inputformat to avoid reading the metadata more than once
Reference<ParquetInputFormat<Tuple>> ref = inputFormatCache.get(location);
parquetInputFormat = ref == null ? null : ref.get();
if (parquetInputFormat == null) {
parquetInputFormat = new UnregisteringParquetInputFormat(location);
inputFormatCache.put(location, new SoftReference<ParquetInputFormat<Tuple>>(parquetInputFormat));
}
}
return parquetInputFormat;
}
@SuppressWarnings("unchecked")
@Override
public void prepareToRead(@SuppressWarnings("rawtypes") RecordReader reader, PigSplit split)
throws IOException {
LOG.debug("LoadFunc.prepareToRead({}, {})", reader, split);
this.reader = reader;
}
@Override
public Tuple getNext() throws IOException {
try {
if (reader.nextKeyValue()) {
return (Tuple)reader.getCurrentValue();
} else {
return null;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ParquetDecodingException("Interrupted", e);
}
}
@Override
public String[] getPartitionKeys(String location, Job job) throws IOException {
if (LOG.isDebugEnabled()) {
String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
LOG.debug("LoadMetadata.getPartitionKeys({}, {})", location, jobToString);
}
setInput(location, job);
return null;
}
@Override
public ResourceSchema getSchema(String location, Job job) throws IOException {
if (LOG.isDebugEnabled()) {
String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
LOG.debug("LoadMetadata.getSchema({}, {})", location, jobToString);
}
setInput(location, job);
return new ResourceSchema(schema);
}
private void initSchema(Job job) throws IOException {
if (schema != null) {
return;
}
if (schema == null && requestedSchema != null) {
// this is only true in front-end
schema = requestedSchema;
}
if (schema == null) {
// no requested schema => use the schema from the file
final GlobalMetaData globalMetaData = getParquetInputFormat().getGlobalMetaData(job);
schema = getPigSchemaFromMultipleFiles(globalMetaData.getSchema(), globalMetaData.getKeyValueMetaData());
}
if (isElephantBirdCompatible(job)) {
convertToElephantBirdCompatibleSchema(schema);
}
}
private void convertToElephantBirdCompatibleSchema(Schema schema) {
if (schema == null) {
return;
}
for(FieldSchema fieldSchema:schema.getFields()){
if (fieldSchema.type== DataType.BOOLEAN) {
fieldSchema.type=DataType.INTEGER;
}
convertToElephantBirdCompatibleSchema(fieldSchema.schema);
}
}
private boolean isElephantBirdCompatible(Job job) {
return getConfiguration(job).getBoolean(TupleReadSupport.PARQUET_PIG_ELEPHANT_BIRD_COMPATIBLE, false);
}
@Override
public ResourceStatistics getStatistics(String location, Job job)
throws IOException {
if (LOG.isDebugEnabled()) {
String jobToString = String.format("job[id=%s, name=%s]", job.getJobID(), job.getJobName());
LOG.debug("LoadMetadata.getStatistics({}, {})", location, jobToString);
}
/* We need to call setInput since setLocation is not
guaranteed to be called before this */
setInput(location, job);
long length = 0;
try {
for (InputSplit split : getParquetInputFormat().getSplits(job)) {
length += split.getLength();
}
} catch (InterruptedException e) {
LOG.warn("Interrupted", e);
Thread.currentThread().interrupt();
return null;
}
ResourceStatistics stats = new ResourceStatistics();
// TODO use pig-0.12 setBytes api when its available
stats.setmBytes(length / 1024 / 1024);
return stats;
}
@Override
public void setPartitionFilter(Expression expression) throws IOException {
LOG.debug("LoadMetadata.setPartitionFilter({})", expression);
}
@Override
public List<OperatorSet> getFeatures() {
return asList(LoadPushDown.OperatorSet.PROJECTION);
}
protected String getPropertyFromUDFContext(String key) {
UDFContext udfContext = UDFContext.getUDFContext();
return udfContext.getUDFProperties(this.getClass(), new String[]{signature}).getProperty(key);
}
protected Object getFromUDFContext(String key) {
UDFContext udfContext = UDFContext.getUDFContext();
return udfContext.getUDFProperties(this.getClass(), new String[]{signature}).get(key);
}
protected void storeInUDFContext(String key, Object value) {
UDFContext udfContext = UDFContext.getUDFContext();
java.util.Properties props = udfContext.getUDFProperties(
this.getClass(), new String[]{signature});
props.put(key, value);
}
@Override
public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldList)
throws FrontendException {
this.requiredFieldList = requiredFieldList;
if (requiredFieldList == null)
return null;
schema = getSchemaFromRequiredFieldList(schema, requiredFieldList.getFields());
storeInUDFContext(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
storeInUDFContext(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
return new RequiredFieldResponse(true);
}
@Override
public void setUDFContextSignature(String signature) {
this.signature = signature;
}
private Schema getSchemaFromRequiredFieldList(Schema schema, List<RequiredField> fieldList)
throws FrontendException {
Schema s = new Schema();
for (RequiredField rf : fieldList) {
FieldSchema f;
try {
f = schema.getField(rf.getAlias()).clone();
} catch (CloneNotSupportedException e) {
throw new FrontendException("Clone not supported for the fieldschema", e);
}
if (rf.getSubFields() == null) {
s.add(f);
} else {
Schema innerSchema = getSchemaFromRequiredFieldList(f.schema, rf.getSubFields());
if (innerSchema == null) {
return null;
} else {
f.schema = innerSchema;
s.add(f);
}
}
}
return s;
}
@Override
public List<String> getPredicateFields(String s, Job job) throws IOException {
if(!job.getConfiguration().getBoolean(ENABLE_PREDICATE_FILTER_PUSHDOWN, DEFAULT_PREDICATE_PUSHDOWN_ENABLED)) {
return null;
}
List<String> fields = new ArrayList<String>();
for(FieldSchema field : schema.getFields()) {
switch(field.type) {
case DataType.BOOLEAN:
case DataType.INTEGER:
case DataType.LONG:
case DataType.FLOAT:
case DataType.DOUBLE:
case DataType.CHARARRAY:
fields.add(field.alias);
break;
default:
// Skip BYTEARRAY, TUPLE, MAP, BAG, DATETIME, BIGINTEGER, BIGDECIMAL
break;
}
}
return fields;
}
@Override
public List<Expression.OpType> getSupportedExpressionTypes() {
OpType supportedTypes [] = {
OpType.OP_EQ,
OpType.OP_NE,
OpType.OP_GT,
OpType.OP_GE,
OpType.OP_LT,
OpType.OP_LE,
OpType.OP_AND,
OpType.OP_OR,
//OpType.OP_BETWEEN, // not implemented in Pig yet
//OpType.OP_IN, // not implemented in Pig yet
OpType.OP_NOT
};
return Arrays.asList(supportedTypes);
}
@Override
public void setPushdownPredicate(Expression e) throws IOException {
LOG.info("Pig pushdown expression: {}", e);
FilterPredicate pred = buildFilter(e);
LOG.info("Parquet filter predicate expression: {}", pred);
storeInUDFContext(ParquetInputFormat.FILTER_PREDICATE, pred);
}
private FilterPredicate buildFilter(Expression e) {
OpType op = e.getOpType();
if (e instanceof BinaryExpression) {
Expression lhs = ((BinaryExpression) e).getLhs();
Expression rhs = ((BinaryExpression) e).getRhs();
switch (op) {
case OP_AND:
return and(buildFilter(lhs), buildFilter(rhs));
case OP_OR:
return or(buildFilter(lhs), buildFilter(rhs));
case OP_BETWEEN:
BetweenExpression between = (BetweenExpression) rhs;
return and(
buildFilter(OpType.OP_GE, (Column) lhs, (Const) between.getLower()),
buildFilter(OpType.OP_LE, (Column) lhs, (Const) between.getUpper()));
case OP_IN:
FilterPredicate current = null;
for (Object value : ((InExpression) rhs).getValues()) {
FilterPredicate next = buildFilter(OpType.OP_EQ, (Column) lhs, (Const) value);
if (current != null) {
current = or(current, next);
} else {
current = next;
}
}
return current;
}
if (lhs instanceof Column && rhs instanceof Const) {
return buildFilter(op, (Column) lhs, (Const) rhs);
} else if (lhs instanceof Const && rhs instanceof Column) {
return buildFilter(op, (Column) rhs, (Const) lhs);
}
} else if (e instanceof UnaryExpression && op == OpType.OP_NOT) {
return LogicalInverseRewriter.rewrite(
not(buildFilter(((UnaryExpression) e).getExpression())));
}
throw new RuntimeException("Could not build filter for expression: " + e);
}
private FilterPredicate buildFilter(OpType op, Column col, Const value) {
String name = col.getName();
try {
FieldSchema f = schema.getField(name);
switch (f.type) {
case DataType.BOOLEAN:
Operators.BooleanColumn boolCol = booleanColumn(name);
switch(op) {
case OP_EQ: return eq(boolCol, getValue(value, boolCol.getColumnType()));
case OP_NE: return notEq(boolCol, getValue(value, boolCol.getColumnType()));
default: throw new RuntimeException(
"Operation " + op + " not supported for boolean column: " + name);
}
case DataType.INTEGER:
Operators.IntColumn intCol = intColumn(name);
return op(op, intCol, value);
case DataType.LONG:
Operators.LongColumn longCol = longColumn(name);
return op(op, longCol, value);
case DataType.FLOAT:
Operators.FloatColumn floatCol = floatColumn(name);
return op(op, floatCol, value);
case DataType.DOUBLE:
Operators.DoubleColumn doubleCol = doubleColumn(name);
return op(op, doubleCol, value);
case DataType.CHARARRAY:
Operators.BinaryColumn binaryCol = binaryColumn(name);
return op(op, binaryCol, value);
default:
throw new RuntimeException("Unsupported type " + f.type + " for field: " + name);
}
} catch (FrontendException e) {
throw new RuntimeException("Error processing pushdown for column:" + col, e);
}
}
private static <C extends Comparable<C>, COL extends Operators.Column<C> & Operators.SupportsLtGt>
FilterPredicate op(Expression.OpType op, COL col, Const valueExpr) {
C value = getValue(valueExpr, col.getColumnType());
switch (op) {
case OP_EQ: return eq(col, value);
case OP_NE: return notEq(col, value);
case OP_GT: return gt(col, value);
case OP_GE: return gtEq(col, value);
case OP_LT: return lt(col, value);
case OP_LE: return ltEq(col, value);
}
return null;
}
private static <C extends Comparable<C>> C getValue(Const valueExpr, Class<C> type) {
Object value = valueExpr.getValue();
if (value instanceof String) {
value = Binary.fromString((String) value);
}
return type.cast(value);
}
}