blob: f2e4fcca3d71273b0aae40a97816192a13ab6347 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.apache.avro.hadoop.io;
import java.nio.ByteBuffer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.AvroValue;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
/**
* Constructs converters that turn objects (usually from the output of a MR job)
* into Avro data that can be serialized.
*
* <p>
* Currently, only the following types have implemented converters:
* <ul>
* <li>AvroKey</li>
* <li>AvroValue</li>
* <li>BooleanWritable</li>
* <li>BytesWritable</li>
* <li>ByteWritable</li>
* <li>DoubleWritable</li>
* <li>FloatWritable</li>
* <li>IntWritable</li>
* <li>LongWritable</li>
* <li>NullWritable</li>
* <li>Text</li>
* </ul>
* </p>
*/
public class AvroDatumConverterFactory extends Configured {
/**
* Creates a new <code>AvroDatumConverterFactory</code> instance.
*
* @param conf The job configuration.
*/
public AvroDatumConverterFactory(Configuration conf) {
super(conf);
}
/**
* Creates a converter that turns objects of type <code>inputClass</code> into
* Avro data.
*
* @param inputClass The type of input data to convert.
* @return A converter that turns objects of type <code>inputClass</code> into
* Avro data.
*/
@SuppressWarnings("unchecked")
public <IN, OUT> AvroDatumConverter<IN, OUT> create(Class<IN> inputClass) {
boolean isMapOnly = ((JobConf) getConf()).getNumReduceTasks() == 0;
if (AvroKey.class.isAssignableFrom(inputClass)) {
Schema schema;
if (isMapOnly) {
schema = AvroJob.getMapOutputKeySchema(getConf());
if (null == schema) {
schema = AvroJob.getOutputKeySchema(getConf());
}
} else {
schema = AvroJob.getOutputKeySchema(getConf());
}
if (null == schema) {
throw new IllegalStateException("Writer schema for output key was not set. Use AvroJob.setOutputKeySchema().");
}
return (AvroDatumConverter<IN, OUT>) new AvroWrapperConverter(schema);
}
if (AvroValue.class.isAssignableFrom(inputClass)) {
Schema schema;
if (isMapOnly) {
schema = AvroJob.getMapOutputValueSchema(getConf());
if (null == schema) {
schema = AvroJob.getOutputValueSchema(getConf());
}
} else {
schema = AvroJob.getOutputValueSchema(getConf());
}
if (null == schema) {
throw new IllegalStateException(
"Writer schema for output value was not set. Use AvroJob.setOutputValueSchema().");
}
return (AvroDatumConverter<IN, OUT>) new AvroWrapperConverter(schema);
}
if (BooleanWritable.class.isAssignableFrom(inputClass)) {
return (AvroDatumConverter<IN, OUT>) new BooleanWritableConverter();
}
if (BytesWritable.class.isAssignableFrom(inputClass)) {
return (AvroDatumConverter<IN, OUT>) new BytesWritableConverter();
}
if (ByteWritable.class.isAssignableFrom(inputClass)) {
return (AvroDatumConverter<IN, OUT>) new ByteWritableConverter();
}
if (DoubleWritable.class.isAssignableFrom(inputClass)) {
return (AvroDatumConverter<IN, OUT>) new DoubleWritableConverter();
}
if (FloatWritable.class.isAssignableFrom(inputClass)) {
return (AvroDatumConverter<IN, OUT>) new FloatWritableConverter();
}
if (IntWritable.class.isAssignableFrom(inputClass)) {
return (AvroDatumConverter<IN, OUT>) new IntWritableConverter();
}
if (LongWritable.class.isAssignableFrom(inputClass)) {
return (AvroDatumConverter<IN, OUT>) new LongWritableConverter();
}
if (NullWritable.class.isAssignableFrom(inputClass)) {
return (AvroDatumConverter<IN, OUT>) new NullWritableConverter();
}
if (Text.class.isAssignableFrom(inputClass)) {
return (AvroDatumConverter<IN, OUT>) new TextConverter();
}
throw new UnsupportedOperationException("Unsupported input type: " + inputClass.getName());
}
/** Converts AvroWrappers into their wrapped Avro data. */
public static class AvroWrapperConverter extends AvroDatumConverter<AvroWrapper<?>, Object> {
private final Schema mSchema;
public AvroWrapperConverter(Schema schema) {
mSchema = schema;
}
/** {@inheritDoc} */
@Override
public Object convert(AvroWrapper<?> input) {
return input.datum();
}
/** {@inheritDoc} */
@Override
public Schema getWriterSchema() {
return mSchema;
}
}
/** Converts BooleanWritables into Booleans. */
public static class BooleanWritableConverter extends AvroDatumConverter<BooleanWritable, Boolean> {
private final Schema mSchema;
/** Constructor. */
public BooleanWritableConverter() {
mSchema = Schema.create(Schema.Type.BOOLEAN);
}
/** {@inheritDoc} */
@Override
public Boolean convert(BooleanWritable input) {
return input.get();
}
/** {@inheritDoc} */
@Override
public Schema getWriterSchema() {
return mSchema;
}
}
/** Converts BytesWritables into ByteBuffers. */
public static class BytesWritableConverter extends AvroDatumConverter<BytesWritable, ByteBuffer> {
private final Schema mSchema;
/** Constructor. */
public BytesWritableConverter() {
mSchema = Schema.create(Schema.Type.BYTES);
}
/** {@inheritDoc} */
@Override
public ByteBuffer convert(BytesWritable input) {
return ByteBuffer.wrap(input.getBytes());
}
/** {@inheritDoc} */
@Override
public Schema getWriterSchema() {
return mSchema;
}
}
/** Converts ByteWritables into GenericFixed of size 1. */
public static class ByteWritableConverter extends AvroDatumConverter<ByteWritable, GenericFixed> {
private final Schema mSchema;
/** Constructor. */
public ByteWritableConverter() {
mSchema = Schema.createFixed("Byte", "A single byte", "org.apache.avro.mapreduce", 1);
}
/** {@inheritDoc} */
@Override
public GenericFixed convert(ByteWritable input) {
return new GenericData.Fixed(mSchema, new byte[] { input.get() });
}
/** {@inheritDoc} */
@Override
public Schema getWriterSchema() {
return mSchema;
}
}
/** Converts DoubleWritables into Doubles. */
public static class DoubleWritableConverter extends AvroDatumConverter<DoubleWritable, Double> {
private final Schema mSchema;
/** Constructor. */
public DoubleWritableConverter() {
mSchema = Schema.create(Schema.Type.DOUBLE);
}
/** {@inheritDoc} */
@Override
public Double convert(DoubleWritable input) {
return input.get();
}
/** {@inheritDoc} */
@Override
public Schema getWriterSchema() {
return mSchema;
}
}
/** Converts FloatWritables into Floats. */
public static class FloatWritableConverter extends AvroDatumConverter<FloatWritable, Float> {
private final Schema mSchema;
/** Constructor. */
public FloatWritableConverter() {
mSchema = Schema.create(Schema.Type.FLOAT);
}
/** {@inheritDoc} */
@Override
public Float convert(FloatWritable input) {
return input.get();
}
/** {@inheritDoc} */
@Override
public Schema getWriterSchema() {
return mSchema;
}
}
/** Converts IntWritables into Ints. */
public static class IntWritableConverter extends AvroDatumConverter<IntWritable, Integer> {
private final Schema mSchema;
/** Constructor. */
public IntWritableConverter() {
mSchema = Schema.create(Schema.Type.INT);
}
/** {@inheritDoc} */
@Override
public Integer convert(IntWritable input) {
return input.get();
}
/** {@inheritDoc} */
@Override
public Schema getWriterSchema() {
return mSchema;
}
}
/** Converts LongWritables into Longs. */
public static class LongWritableConverter extends AvroDatumConverter<LongWritable, Long> {
private final Schema mSchema;
/** Constructor. */
public LongWritableConverter() {
mSchema = Schema.create(Schema.Type.LONG);
}
/** {@inheritDoc} */
@Override
public Long convert(LongWritable input) {
return input.get();
}
/** {@inheritDoc} */
@Override
public Schema getWriterSchema() {
return mSchema;
}
}
/** Converts NullWritables into Nulls. */
public static class NullWritableConverter extends AvroDatumConverter<NullWritable, Object> {
private final Schema mSchema;
/** Constructor. */
public NullWritableConverter() {
mSchema = Schema.create(Schema.Type.NULL);
}
/** {@inheritDoc} */
@Override
public Object convert(NullWritable input) {
return null;
}
/** {@inheritDoc} */
@Override
public Schema getWriterSchema() {
return mSchema;
}
}
/** Converts Text into CharSequences. */
public static class TextConverter extends AvroDatumConverter<Text, CharSequence> {
private final Schema mSchema;
/** Constructor. */
public TextConverter() {
mSchema = Schema.create(Schema.Type.STRING);
}
/** {@inheritDoc} */
@Override
public CharSequence convert(Text input) {
return input.toString();
}
/** {@inheritDoc} */
@Override
public Schema getWriterSchema() {
return mSchema;
}
}
}