blob: 0c38105a9ca6d026e7115d5752f15892e5d5462e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.types.avro;
import com.google.common.collect.Maps;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.crunch.io.FormatBundle;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import java.util.Map;
/**
* AvroMode is an immutable object used for configuring the reading and writing of Avro types.
* The mode will not be used or honored unless it has been appropriately configured using one of the supported
* methods. Certain sources might also support specifying a specific mode to use.
*/
public class AvroMode implements ReaderWriterFactory {
/**
* Internal enum which represents the various Avro data types.
*/
public static enum ModeType {
SPECIFIC, REFLECT, GENERIC
}
/**
* Default mode to use for reading and writing {@link ReflectData Reflect} types.
*/
public static final AvroMode REFLECT = new AvroMode(ModeType.REFLECT, Avros.REFLECT_DATA_FACTORY_CLASS);
/**
* Default mode to use for reading and writing {@link SpecificData Specific} types.
*/
public static final AvroMode SPECIFIC = new AvroMode(ModeType.SPECIFIC, "crunch.specificfactory");
/**
* Default mode to use for reading and writing {@link GenericData Generic} types.
*/
public static final AvroMode GENERIC = new AvroMode(ModeType.GENERIC, "crunch.genericfactory");
public static final String AVRO_MODE_PROPERTY = "crunch.avro.mode";
public static final String AVRO_SHUFFLE_MODE_PROPERTY = "crunch.avro.shuffle.mode";
/**
* Creates an AvroMode based on the {@link #AVRO_MODE_PROPERTY} property in the {@code conf}.
* @param conf The configuration holding the properties for mode to be created.
* @return an AvroMode based on the {@link #AVRO_MODE_PROPERTY} property in the {@code conf}.
*/
public static AvroMode fromConfiguration(Configuration conf) {
AvroMode mode = getMode(conf.getEnum(AVRO_MODE_PROPERTY, ModeType.REFLECT));
return mode.withFactoryFromConfiguration(conf);
}
/**
* Creates an AvroMode based on the {@link #AVRO_SHUFFLE_MODE_PROPERTY} property in the {@code conf}.
* @param conf The configuration holding the properties for mode to be created.
* @return an AvroMode based on the {@link #AVRO_SHUFFLE_MODE_PROPERTY} property in the {@code conf}.
*/
public static AvroMode fromShuffleConfiguration(Configuration conf) {
AvroMode mode = getMode(conf.getEnum(AVRO_SHUFFLE_MODE_PROPERTY, ModeType.REFLECT));
return mode.withFactoryFromConfiguration(conf);
}
/**
* Creates an {@link AvroMode} based upon the specified {@code type}.
* @param type the Avro type which indicates a specific mode.
* @return an {@link AvroMode} based upon the specified {@code type}.
*/
public static AvroMode fromType(AvroType<?> type) {
if (type.hasReflect()) {
if (type.hasSpecific()) {
Avros.checkCombiningSpecificAndReflectionSchemas();
}
return REFLECT;
} else if (type.hasSpecific()) {
return SPECIFIC;
} else {
return GENERIC;
}
}
private static AvroMode getMode(ModeType modeType){
switch(modeType){
case SPECIFIC:
return SPECIFIC;
case GENERIC:
return GENERIC;
case REFLECT:
default:
return REFLECT;
}
}
private static ClassLoader specificLoader = null;
/**
* Set the {@code ClassLoader} that will be used for loading Avro {@code org.apache.avro.specific.SpecificRecord}
* and reflection implementation classes. It is typically not necessary to call this method -- it should only be used
* if a specific class loader is needed in order to load the specific datum classes.
*
* @param loader the {@code ClassLoader} to be used for loading specific datum classes
*/
public static void setSpecificClassLoader(ClassLoader loader) {
specificLoader = loader;
}
/**
* Get the configured {@code ClassLoader} to be used for loading Avro {@code org.apache.specific.SpecificRecord}
* and reflection implementation classes. The return value may be null.
*
* @return the configured {@code ClassLoader} for loading specific or reflection datum classes, may be null
*/
public static ClassLoader getSpecificClassLoader() {
return specificLoader;
}
/**
* Internal method for setting the specific class loader if none is already set. If no specific class loader is set,
* the given class loader will be set as the specific class loader. If a specific class loader is already set, this
* will be a no-op.
*
* @param loader the {@code ClassLoader} to be registered as the specific class loader if no specific class loader
* is already set
*/
static void registerSpecificClassLoaderInternal(ClassLoader loader) {
if (specificLoader == null) {
setSpecificClassLoader(loader);
}
}
/**
* the factory methods in this class may be overridden in ReaderWriterFactory
*/
private final ReaderWriterFactory factory;
/**
* The property name used setting property into {@link Configuration}.
*/
private final String propName;
/**
* The mode type representing the Avro data form.
*/
private final ModeType modeType;
private AvroMode(ModeType modeType, ReaderWriterFactory factory, String propName) {
this.factory = factory;
this.propName = propName;
this.modeType = modeType;
}
private AvroMode(ModeType modeType, String propName) {
this(modeType, null, propName);
}
/**
* Returns a {@link GenericData} instance based on the mode type.
* @return a {@link GenericData} instance based on the mode type.
*/
public GenericData getData() {
if (factory != null) {
return factory.getData();
}
switch(this.modeType) {
case REFLECT:
return ReflectData.AllowNull.get();
case SPECIFIC:
return SpecificData.get();
default:
return GenericData.get();
}
}
/**
* Creates a {@code DatumReader} based on the {@code schema}.
* @param schema the schema to be read
* @param <T> the record type created by the reader.
* @return a {@code DatumReader} based on the {@code schema}.
*/
public <T> DatumReader<T> getReader(Schema schema) {
if (factory != null) {
return factory.getReader(schema);
}
switch (this.modeType) {
case REFLECT:
if (specificLoader != null) {
return new ReflectDatumReader<T>(schema, schema, new ReflectData(specificLoader));
} else {
return new ReflectDatumReader<T>(schema);
}
case SPECIFIC:
if (specificLoader != null) {
return new SpecificDatumReader<T>(
schema, schema, new SpecificData(specificLoader));
} else {
return new SpecificDatumReader<T>(schema);
}
default:
return new GenericDatumReader<T>(schema);
}
}
/**
* Creates a {@code DatumWriter} based on the {@code schema}.
* @param schema the schema to be read
* @param <T> the record type created by the writer.
* @return a {@code DatumWriter} based on the {@code schema}.
*/
public <T> DatumWriter<T> getWriter(Schema schema) {
if (factory != null) {
return factory.getWriter(schema);
}
switch (this.modeType) {
case REFLECT:
return new ReflectDatumWriter<T>(schema);
case SPECIFIC:
return new SpecificDatumWriter<T>(schema);
default:
return new GenericDatumWriter<T>(schema);
}
}
/**
* Creates a new {@code AvroMode} instance which will utilize the {@code factory} instance
* for creating Avro readers and writers.
*
* @param factory factory implementation for the mode to use
* @return a new {@code AvroMode} instance which will utilize the {@code factory} instance
* for creating Avro readers and writers.
* @deprecated use {@link #withFactory(ReaderWriterFactory)} instead.
*/
@Deprecated
public AvroMode override(ReaderWriterFactory factory) {
return withFactory(factory);
}
/**
* Creates a new {@code AvroMode} instance which will utilize the {@code factory} instance
* for creating Avro readers and writers. If {@code null} the default factory for the mode
* will be used.
*
* @param factory factory implementation for the mode to use
* @return a new {@code AvroMode} instance which will utilize the {@code factory} instance
* for creating Avro readers and writers.
*/
public AvroMode withFactory(ReaderWriterFactory factory){
if (factory != this) {
return withReaderWriterFactory(factory);
} else {
return this;
}
}
/**
* Populates the {@code conf} with mode specific settings for use during the shuffle phase.
* @param conf the configuration to populate.
*/
public void configureShuffle(Configuration conf) {
conf.setEnum(AVRO_SHUFFLE_MODE_PROPERTY, this.modeType);
configure(conf);
}
/**
* Populates the {@code bundle} with mode specific settings for the specific {@link FormatBundle}.
* @param bundle the bundle to populate.
*/
public void configure(FormatBundle bundle) {
bundle.set(AVRO_MODE_PROPERTY, this.modeType.toString());
if (factory != null) {
bundle.set(propName, factory.getClass().getName());
}
}
/**
* Populates the {@code conf} with mode specific settings.
* @param conf the configuration to populate.
*/
public void configure(Configuration conf) {
conf.set(AVRO_MODE_PROPERTY, this.modeType.toString());
if (factory != null) {
conf.setClass(propName, factory.getClass(), ReaderWriterFactory.class);
}
}
/**
* Returns the entries that a {@code Configuration} instance needs to enable
* this AvroMode as a serializable map of key-value pairs.
*/
public Map<String, String> getModeProperties() {
Map<String, String> props = Maps.newHashMap();
props.put(AVRO_MODE_PROPERTY, this.modeType.toString());
if (factory != null) {
props.put(propName, factory.getClass().getCanonicalName());
}
return props;
}
/**
* Populates the {@code conf} with mode specific settings.
* @param conf the configuration to populate.
* @deprecated use {@link #configure(org.apache.hadoop.conf.Configuration)}
*/
@Deprecated
public void configureFactory(Configuration conf) {
configure(conf);
}
/**
* Creates a new {@code AvroMode} instance which will utilize the {@code factory} instance
* for creating Avro readers and writers. If {@code null} the default factory for the mode
* will be used.
*
* @param readerWriterFactory factory implementation for the mode to use
* @return a new {@code AvroMode} instance which will utilize the {@code factory} instance
* for creating Avro readers and writers.
*/
private AvroMode withReaderWriterFactory(ReaderWriterFactory readerWriterFactory) {
return new AvroMode(modeType, readerWriterFactory, propName);
}
/**
* Returns the factory that will be used for the mode.
*
* @return the factory that will be used for the mode.
*/
public ReaderWriterFactory getFactory() {
return factory != null ? factory : this;
}
@Override
public boolean equals(Object o) {
if(o == null){
return false;
}
if(this == o){
return true;
}
if(!(o instanceof AvroMode)){
return false;
}
AvroMode that = (AvroMode) o;
if(!this.modeType.equals(that.modeType)){
return false;
}
if(!this.propName.equals(that.propName)){
return false;
}
if(this.factory != null){
if(that.factory == null){
return false;
}else {
return this.factory.equals(that.factory);
}
}else{
return that.factory == null;
}
}
@Override
public int hashCode() {
int hash = propName.hashCode();
hash = 31*hash + modeType.hashCode();
if(factory != null){
hash = 31*hash+factory.hashCode();
}
return hash;
}
@SuppressWarnings("unchecked")
public AvroMode withFactoryFromConfiguration(Configuration conf) {
// although the shuffle and input/output use different properties for mode,
// this is shared - only one ReaderWriterFactory can be used.
Class<?> factoryClass = conf.getClass(propName, this.getClass());
if (factoryClass != this.getClass()) {
return withReaderWriterFactory((ReaderWriterFactory)
ReflectionUtils.newInstance(factoryClass, conf));
} else {
return this;
}
}
}