blob: 9660b16728e825cdc2bb2296942328ffbed8fb1d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.io.parquet;
import java.io.IOException;
import java.util.List;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.crunch.ReadableData;
import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.crunch.io.FormatBundle;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.io.impl.FileSourceImpl;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.parquet.avro.AvroParquetInputFormat;
import org.apache.parquet.avro.AvroReadSupport;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.hadoop.ParquetInputSplit;
public class AvroParquetFileSource<T extends IndexedRecord> extends FileSourceImpl<T> implements ReadableSource<T> {
private static final String AVRO_READ_SCHEMA = "parquet.avro.read.schema";
private final String projSchema;
private static <S> FormatBundle<AvroParquetInputFormat> getBundle(
AvroType<S> ptype,
Schema projSchema,
Class<? extends UnboundRecordFilter> filterClass) {
FormatBundle<AvroParquetInputFormat> fb = FormatBundle.forInput(AvroParquetInputFormat.class)
.set(AVRO_READ_SCHEMA, ptype.getSchema().toString());
if (projSchema != null) {
fb.set(AvroReadSupport.AVRO_REQUESTED_PROJECTION, projSchema.toString());
}
if (filterClass != null) {
fb.set("parquet.read.filter", filterClass.getName());
}
if (!FileSplit.class.isAssignableFrom(ParquetInputSplit.class)) {
// Older ParquetRecordReader expects ParquetInputSplits, not FileSplits, so it
// doesn't work with CombineFileInputFormat
fb.set(RuntimeParameters.DISABLE_COMBINE_FILE, "true");
}
return fb;
}
public AvroParquetFileSource(Path path, AvroType<T> ptype) {
this(ImmutableList.of(path), ptype);
}
/**
* Read the Parquet data at the given path using the schema of the {@code AvroType}, and projecting
* a subset of the columns from this schema via the separately given {@code Schema}.
*
* @param path the path of the file to read
* @param ptype the AvroType to use in reading the file
* @param projSchema the subset of columns from the input schema to read
*/
public AvroParquetFileSource(Path path, AvroType<T> ptype, Schema projSchema) {
this(ImmutableList.of(path), ptype, projSchema);
}
public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype) {
this(paths, ptype, null, null);
}
/**
* Read the Parquet data at the given paths using the schema of the {@code AvroType}, and projecting
* a subset of the columns from this schema via the separately given {@code Schema}.
*
* @param paths the list of paths to read
* @param ptype the AvroType to use in reading the file
* @param projSchema the subset of columns from the input schema to read
*/
public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema projSchema) {
this(paths, ptype, projSchema, null);
}
public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype,
Class<? extends UnboundRecordFilter> filterClass) {
this(paths, ptype, null, filterClass);
}
/**
* Read the Parquet data at the given paths using the schema of the {@code AvroType}, projecting
* a subset of the columns from this schema via the separately given {@code Schema}, and using
* the filter class to select the input records.
*
* @param paths the list of paths to read
* @param ptype the AvroType to use in reading the file
* @param projSchema the subset of columns from the input schema to read
*/
public AvroParquetFileSource(List<Path> paths, AvroType<T> ptype, Schema projSchema,
Class<? extends UnboundRecordFilter> filterClass) {
super(paths, ptype, getBundle(ptype, projSchema, filterClass));
this.projSchema = projSchema == null ? null : projSchema.toString();
}
public Schema getProjectedSchema() {
return (new Schema.Parser()).parse(projSchema);
}
@Override
public Iterable<T> read(Configuration conf) throws IOException {
return read(conf, getFileReaderFactory((AvroType<T>) ptype));
}
@Override
public ReadableData<T> asReadable() {
return new AvroParquetReadableData<T>(paths, (AvroType<T>) ptype);
}
protected AvroParquetFileReaderFactory<T> getFileReaderFactory(AvroType<T> ptype){
return new AvroParquetFileReaderFactory<T>(ptype);
}
@Override
public Converter<?, ?, ?, ?> getConverter() {
return new AvroParquetConverter<T>((AvroType<T>) ptype);
}
@Override
public String toString() {
return "Parquet(" + pathsAsString() + ((projSchema == null) ? ")" : ") -> " + projSchema);
}
public static <T extends SpecificRecord> Builder<T> builder(Class<T> clazz) {
return new Builder<T>(Preconditions.checkNotNull(clazz));
}
public static Builder<GenericRecord> builder(Schema schema) {
Preconditions.checkNotNull(schema);
Preconditions.checkArgument(Schema.Type.RECORD.equals(schema.getType()));
return new Builder(schema);
}
/**
* Helper class for constructing an {@code AvroParquetFileSource} that only reads a subset of the
* fields defined in an Avro schema.
*/
public static class Builder<T extends IndexedRecord> {
private Class<T> clazz;
private Schema baseSchema;
private List<Schema.Field> fields = Lists.newArrayList();
private Class<? extends UnboundRecordFilter> filterClass;
private Builder(Class<T> clazz) {
this.clazz = clazz;
this.baseSchema = ReflectionUtils.newInstance(clazz, null).getSchema();
}
private Builder(Schema baseSchema) {
this.baseSchema = baseSchema;
}
public Builder includeField(String fieldName) {
Schema.Field field = baseSchema.getField(fieldName);
if (field == null) {
throw new IllegalArgumentException("No field " + fieldName + " in schema: " + baseSchema.getName());
}
fields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal(), field.order()));
return this;
}
public Builder filterClass(Class<? extends UnboundRecordFilter> filterClass) {
this.filterClass = filterClass;
return this;
}
public AvroParquetFileSource<T> build(Path path) {
return build(ImmutableList.of(path));
}
public AvroParquetFileSource<T> build(List<Path> paths) {
AvroType at = clazz == null ? Avros.generics(baseSchema) : Avros.specifics((Class) clazz);
if (fields.isEmpty()) {
return new AvroParquetFileSource<T>(paths, at, filterClass);
} else {
Schema projected = Schema.createRecord(
baseSchema.getName(),
baseSchema.getDoc(),
baseSchema.getNamespace(),
baseSchema.isError());
projected.setFields(fields);
return new AvroParquetFileSource<T>(paths, at, projected, filterClass);
}
}
}
}