blob: 053c77a9197e4970a663a806122cb140f98584c8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.apache.avro.hadoop.io;
import java.io.IOException;
import java.io.InputStream;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.mapred.AvroWrapper;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.hadoop.io.serializer.Deserializer;
/**
* Deserializes AvroWrapper objects within Hadoop.
*
* <p>
* Keys and values containing Avro types are more efficiently serialized outside
* of the WritableSerialization model, so they are wrapper in
* {@link org.apache.avro.mapred.AvroWrapper} objects and deserialization is
* handled by this class.
* </p>
*
* <p>
* MapReduce jobs that use AvroWrapper objects as keys or values need to be
* configured with {@link AvroSerialization}. Use
* {@link org.apache.avro.mapreduce.AvroJob} to help with Job configuration.
* </p>
*
* @param <T> The type of Avro wrapper.
* @param <D> The Java type of the Avro data being wrapped.
*/
public abstract class AvroDeserializer<T extends AvroWrapper<D>, D> implements Deserializer<T> {
/** The Avro writer schema for deserializing. */
private final Schema mWriterSchema;
/** The Avro reader schema for deserializing. */
private final Schema mReaderSchema;
/** The Avro datum reader for deserializing. */
final DatumReader<D> mAvroDatumReader;
/** An Avro binary decoder for deserializing. */
private BinaryDecoder mAvroDecoder;
/**
* Constructor.
*
* @param writerSchema The Avro writer schema for the data to deserialize.
* @param readerSchema The Avro reader schema for the data to deserialize (may
* be null).
*/
protected AvroDeserializer(Schema writerSchema, Schema readerSchema, ClassLoader classLoader) {
mWriterSchema = writerSchema;
mReaderSchema = null != readerSchema ? readerSchema : writerSchema;
mAvroDatumReader = new ReflectDatumReader<>(mWriterSchema, mReaderSchema, new ReflectData(classLoader));
}
/**
* Constructor.
*
* @param writerSchema The Avro writer schema for the data to deserialize.
* @param readerSchema The Avro reader schema for the data to deserialize (may
* be null).
* @param datumReader The Avro datum reader to use for deserialization.
*/
protected AvroDeserializer(Schema writerSchema, Schema readerSchema, DatumReader<D> datumReader) {
mWriterSchema = writerSchema;
mReaderSchema = null != readerSchema ? readerSchema : writerSchema;
mAvroDatumReader = datumReader;
}
/**
* Gets the writer schema used for deserializing.
*
* @return The writer schema;
*/
public Schema getWriterSchema() {
return mWriterSchema;
}
/**
* Gets the reader schema used for deserializing.
*
* @return The reader schema.
*/
public Schema getReaderSchema() {
return mReaderSchema;
}
/** {@inheritDoc} */
@Override
public void open(InputStream inputStream) throws IOException {
mAvroDecoder = DecoderFactory.get().directBinaryDecoder(inputStream, mAvroDecoder);
}
/** {@inheritDoc} */
@Override
public T deserialize(T avroWrapperToReuse) throws IOException {
// Create a new Avro wrapper if there isn't one to reuse.
if (null == avroWrapperToReuse) {
avroWrapperToReuse = createAvroWrapper();
}
// Deserialize the Avro datum from the input stream.
avroWrapperToReuse.datum(mAvroDatumReader.read(avroWrapperToReuse.datum(), mAvroDecoder));
return avroWrapperToReuse;
}
/** {@inheritDoc} */
@Override
public void close() throws IOException {
mAvroDecoder.inputStream().close();
}
/**
* Creates a new empty <code>T</code> (extends AvroWrapper) instance.
*
* @return A new empty <code>T</code> instance.
*/
protected abstract T createAvroWrapper();
}