blob: 83e5e80966f8be7fbecfff2896a8aaec81d908a0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.generic;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.ResolvingDecoder;
import org.apache.avro.util.Utf8;
import org.apache.avro.util.WeakIdentityHashMap;
/** {@link DatumReader} for generic Java objects. */
public class GenericDatumReader<D> implements DatumReader<D> {
private final GenericData data;
private Schema actual;
private Schema expected;
private DatumReader<D> fastDatumReader = null;
private ResolvingDecoder creatorResolver = null;
private final Thread creator;
public GenericDatumReader() {
this(null, null, GenericData.get());
}
/** Construct where the writer's and reader's schemas are the same. */
public GenericDatumReader(Schema schema) {
this(schema, schema, GenericData.get());
}
/** Construct given writer's and reader's schema. */
public GenericDatumReader(Schema writer, Schema reader) {
this(writer, reader, GenericData.get());
}
public GenericDatumReader(Schema writer, Schema reader, GenericData data) {
this(data);
this.actual = writer;
this.expected = reader;
}
protected GenericDatumReader(GenericData data) {
this.data = data;
this.creator = Thread.currentThread();
}
/** Return the {@link GenericData} implementation. */
public GenericData getData() {
return data;
}
/** Return the writer's schema. */
public Schema getSchema() {
return actual;
}
@Override
public void setSchema(Schema writer) {
this.actual = writer;
if (expected == null) {
expected = actual;
}
creatorResolver = null;
fastDatumReader = null;
}
/** Get the reader's schema. */
public Schema getExpected() {
return expected;
}
/** Set the reader's schema. */
public void setExpected(Schema reader) {
this.expected = reader;
creatorResolver = null;
}
private static final ThreadLocal<Map<Schema, Map<Schema, ResolvingDecoder>>> RESOLVER_CACHE = ThreadLocal
.withInitial(WeakIdentityHashMap::new);
/**
* Gets a resolving decoder for use by this GenericDatumReader. Unstable API.
* Currently uses a thread local cache to prevent constructing the resolvers too
* often, because that is very expensive.
*/
protected final ResolvingDecoder getResolver(Schema actual, Schema expected) throws IOException {
Thread currThread = Thread.currentThread();
ResolvingDecoder resolver;
if (currThread == creator && creatorResolver != null) {
return creatorResolver;
}
Map<Schema, ResolvingDecoder> cache = RESOLVER_CACHE.get().get(actual);
if (cache == null) {
cache = new WeakIdentityHashMap<>();
RESOLVER_CACHE.get().put(actual, cache);
}
resolver = cache.get(expected);
if (resolver == null) {
resolver = DecoderFactory.get().resolvingDecoder(Schema.applyAliases(actual, expected), expected, null);
cache.put(expected, resolver);
}
if (currThread == creator) {
creatorResolver = resolver;
}
return resolver;
}
@Override
@SuppressWarnings("unchecked")
public D read(D reuse, Decoder in) throws IOException {
if (data.isFastReaderEnabled()) {
if (this.fastDatumReader == null) {
this.fastDatumReader = data.getFastReaderBuilder().createDatumReader(actual, expected);
}
return fastDatumReader.read(reuse, in);
}
ResolvingDecoder resolver = getResolver(actual, expected);
resolver.configure(in);
D result = (D) read(reuse, expected, resolver);
resolver.drain();
return result;
}
/** Called to read data. */
protected Object read(Object old, Schema expected, ResolvingDecoder in) throws IOException {
Object datum = readWithoutConversion(old, expected, in);
LogicalType logicalType = expected.getLogicalType();
if (logicalType != null) {
Conversion<?> conversion = getData().getConversionFor(logicalType);
if (conversion != null) {
return convert(datum, expected, logicalType, conversion);
}
}
return datum;
}
protected Object readWithConversion(Object old, Schema expected, LogicalType logicalType, Conversion<?> conversion,
ResolvingDecoder in) throws IOException {
return convert(readWithoutConversion(old, expected, in), expected, logicalType, conversion);
}
protected Object readWithoutConversion(Object old, Schema expected, ResolvingDecoder in) throws IOException {
switch (expected.getType()) {
case RECORD:
return readRecord(old, expected, in);
case ENUM:
return readEnum(expected, in);
case ARRAY:
return readArray(old, expected, in);
case MAP:
return readMap(old, expected, in);
case UNION:
return read(old, expected.getTypes().get(in.readIndex()), in);
case FIXED:
return readFixed(old, expected, in);
case STRING:
return readString(old, expected, in);
case BYTES:
return readBytes(old, expected, in);
case INT:
return readInt(old, expected, in);
case LONG:
return in.readLong();
case FLOAT:
return in.readFloat();
case DOUBLE:
return in.readDouble();
case BOOLEAN:
return in.readBoolean();
case NULL:
in.readNull();
return null;
default:
throw new AvroRuntimeException("Unknown type: " + expected);
}
}
/**
* Convert a underlying representation of a logical type (such as a ByteBuffer)
* to a higher level object (such as a BigDecimal).
*
* @throws IllegalArgumentException if a null schema or logicalType is passed in
* while datum and conversion are not null.
* Please be noticed that the exception type
* has changed. With version 1.8.0 and earlier,
* in above circumstance, the exception thrown
* out depends on the implementation of
* conversion (most likely a
* NullPointerException). Now, an
* IllegalArgumentException will be thrown out
* instead.
*/
protected Object convert(Object datum, Schema schema, LogicalType type, Conversion<?> conversion) {
return Conversions.convertToLogicalType(datum, schema, type, conversion);
}
/**
* Called to read a record instance. May be overridden for alternate record
* representations.
*/
protected Object readRecord(Object old, Schema expected, ResolvingDecoder in) throws IOException {
final Object record = data.newRecord(old, expected);
final Object state = data.getRecordState(record, expected);
for (Field field : in.readFieldOrder()) {
int pos = field.pos();
String name = field.name();
Object oldDatum = null;
if (old != null) {
oldDatum = data.getField(record, name, pos, state);
}
readField(record, field, oldDatum, in, state);
}
return record;
}
/**
* Called to read a single field of a record. May be overridden for more
* efficient or alternate implementations.
*/
protected void readField(Object record, Field field, Object oldDatum, ResolvingDecoder in, Object state)
throws IOException {
data.setField(record, field.name(), field.pos(), read(oldDatum, field.schema(), in), state);
}
/**
* Called to read an enum value. May be overridden for alternate enum
* representations. By default, returns a GenericEnumSymbol.
*/
protected Object readEnum(Schema expected, Decoder in) throws IOException {
return createEnum(expected.getEnumSymbols().get(in.readEnum()), expected);
}
/**
* Called to create an enum value. May be overridden for alternate enum
* representations. By default, returns a GenericEnumSymbol.
*/
protected Object createEnum(String symbol, Schema schema) {
return data.createEnum(symbol, schema);
}
/**
* Called to read an array instance. May be overridden for alternate array
* representations.
*/
protected Object readArray(Object old, Schema expected, ResolvingDecoder in) throws IOException {
Schema expectedType = expected.getElementType();
long l = in.readArrayStart();
long base = 0;
if (l > 0) {
LogicalType logicalType = expectedType.getLogicalType();
Conversion<?> conversion = getData().getConversionFor(logicalType);
Object array = newArray(old, (int) l, expected);
do {
if (logicalType != null && conversion != null) {
for (long i = 0; i < l; i++) {
addToArray(array, base + i,
readWithConversion(peekArray(array), expectedType, logicalType, conversion, in));
}
} else {
for (long i = 0; i < l; i++) {
addToArray(array, base + i, readWithoutConversion(peekArray(array), expectedType, in));
}
}
base += l;
} while ((l = in.arrayNext()) > 0);
return pruneArray(array);
} else {
return pruneArray(newArray(old, 0, expected));
}
}
private Object pruneArray(Object object) {
if (object instanceof GenericArray<?>) {
((GenericArray<?>) object).prune();
}
return object;
}
/**
* Called by the default implementation of {@link #readArray} to retrieve a
* value from a reused instance. The default implementation is for
* {@link GenericArray}.
*/
@SuppressWarnings("unchecked")
protected Object peekArray(Object array) {
return (array instanceof GenericArray) ? ((GenericArray) array).peek() : null;
}
/**
* Called by the default implementation of {@link #readArray} to add a value.
* The default implementation is for {@link Collection}.
*/
@SuppressWarnings("unchecked")
protected void addToArray(Object array, long pos, Object e) {
((Collection) array).add(e);
}
/**
* Called to read a map instance. May be overridden for alternate map
* representations.
*/
protected Object readMap(Object old, Schema expected, ResolvingDecoder in) throws IOException {
Schema eValue = expected.getValueType();
long l = in.readMapStart();
LogicalType logicalType = eValue.getLogicalType();
Conversion<?> conversion = getData().getConversionFor(logicalType);
Object map = newMap(old, (int) l);
if (l > 0) {
do {
if (logicalType != null && conversion != null) {
for (int i = 0; i < l; i++) {
addToMap(map, readMapKey(null, expected, in),
readWithConversion(null, eValue, logicalType, conversion, in));
}
} else {
for (int i = 0; i < l; i++) {
addToMap(map, readMapKey(null, expected, in), readWithoutConversion(null, eValue, in));
}
}
} while ((l = in.mapNext()) > 0);
}
return map;
}
/**
* Called by the default implementation of {@link #readMap} to read a key value.
* The default implementation returns delegates to
* {@link #readString(Object, org.apache.avro.io.Decoder)}.
*/
protected Object readMapKey(Object old, Schema expected, Decoder in) throws IOException {
return readString(old, expected, in);
}
/**
* Called by the default implementation of {@link #readMap} to add a key/value
* pair. The default implementation is for {@link Map}.
*/
@SuppressWarnings("unchecked")
protected void addToMap(Object map, Object key, Object value) {
((Map) map).put(key, value);
}
/**
* Called to read a fixed value. May be overridden for alternate fixed
* representations. By default, returns {@link GenericFixed}.
*/
protected Object readFixed(Object old, Schema expected, Decoder in) throws IOException {
GenericFixed fixed = (GenericFixed) data.createFixed(old, expected);
in.readFixed(fixed.bytes(), 0, expected.getFixedSize());
return fixed;
}
/**
* Called to create an fixed value. May be overridden for alternate fixed
* representations. By default, returns {@link GenericFixed}.
*
* @deprecated As of Avro 1.6.0 this method has been moved to
* {@link GenericData#createFixed(Object, Schema)}
*/
@Deprecated
protected Object createFixed(Object old, Schema schema) {
return data.createFixed(old, schema);
}
/**
* Called to create an fixed value. May be overridden for alternate fixed
* representations. By default, returns {@link GenericFixed}.
*
* @deprecated As of Avro 1.6.0 this method has been moved to
* {@link GenericData#createFixed(Object, byte[], Schema)}
*/
@Deprecated
protected Object createFixed(Object old, byte[] bytes, Schema schema) {
return data.createFixed(old, bytes, schema);
}
/**
* Called to create new record instances. Subclasses may override to use a
* different record implementation. The returned instance must conform to the
* schema provided. If the old object contains fields not present in the schema,
* they should either be removed from the old object, or it should create a new
* instance that conforms to the schema. By default, this returns a
* {@link GenericData.Record}.
*
* @deprecated As of Avro 1.6.0 this method has been moved to
* {@link GenericData#newRecord(Object, Schema)}
*/
@Deprecated
protected Object newRecord(Object old, Schema schema) {
return data.newRecord(old, schema);
}
/**
* Called to create new array instances. Subclasses may override to use a
* different array implementation. By default, this returns a
* {@link GenericData.Array}.
*/
@SuppressWarnings("unchecked")
protected Object newArray(Object old, int size, Schema schema) {
return data.newArray(old, size, schema);
}
/**
* Called to create new array instances. Subclasses may override to use a
* different map implementation. By default, this returns a {@link HashMap}.
*/
@SuppressWarnings("unchecked")
protected Object newMap(Object old, int size) {
return data.newMap(old, size);
}
/**
* Called to read strings. Subclasses may override to use a different string
* representation. By default, this calls {@link #readString(Object,Decoder)}.
*/
protected Object readString(Object old, Schema expected, Decoder in) throws IOException {
Class stringClass = getStringClass(expected);
if (stringClass == String.class) {
return in.readString();
}
if (stringClass == CharSequence.class) {
return readString(old, in);
}
return newInstanceFromString(stringClass, in.readString());
}
/**
* Called to read strings. Subclasses may override to use a different string
* representation. By default, this calls {@link Decoder#readString(Utf8)}.
*/
protected Object readString(Object old, Decoder in) throws IOException {
return in.readString(old instanceof Utf8 ? (Utf8) old : null);
}
/**
* Called to create a string from a default value. Subclasses may override to
* use a different string representation. By default, this calls
* {@link Utf8#Utf8(String)}.
*/
protected Object createString(String value) {
return new Utf8(value);
}
/**
* Determines the class to used to represent a string Schema. By default uses
* {@link GenericData#STRING_PROP} to determine whether {@link Utf8} or
* {@link String} is used. Subclasses may override for alternate
* representations.
*/
protected Class findStringClass(Schema schema) {
String name = schema.getProp(GenericData.STRING_PROP);
if (name == null)
return CharSequence.class;
switch (GenericData.StringType.valueOf(name)) {
case String:
return String.class;
default:
return CharSequence.class;
}
}
private Map<Schema, Class> stringClassCache = new IdentityHashMap<>();
private Class getStringClass(Schema s) {
Class c = stringClassCache.get(s);
if (c == null) {
c = findStringClass(s);
stringClassCache.put(s, c);
}
return c;
}
private final Map<Class, Constructor> stringCtorCache = new HashMap<>();
@SuppressWarnings("unchecked")
protected Object newInstanceFromString(Class c, String s) {
try {
Constructor ctor = stringCtorCache.get(c);
if (ctor == null) {
ctor = c.getDeclaredConstructor(String.class);
ctor.setAccessible(true);
stringCtorCache.put(c, ctor);
}
return ctor.newInstance(s);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException | InstantiationException e) {
throw new AvroRuntimeException(e);
}
}
/**
* Called to read byte arrays. Subclasses may override to use a different byte
* array representation. By default, this calls
* {@link Decoder#readBytes(ByteBuffer)}.
*/
protected Object readBytes(Object old, Schema s, Decoder in) throws IOException {
return readBytes(old, in);
}
/**
* Called to read byte arrays. Subclasses may override to use a different byte
* array representation. By default, this calls
* {@link Decoder#readBytes(ByteBuffer)}.
*/
protected Object readBytes(Object old, Decoder in) throws IOException {
return in.readBytes(old instanceof ByteBuffer ? (ByteBuffer) old : null);
}
/**
* Called to read integers. Subclasses may override to use a different integer
* representation. By default, this calls {@link Decoder#readInt()}.
*/
protected Object readInt(Object old, Schema expected, Decoder in) throws IOException {
return in.readInt();
}
/**
* Called to create byte arrays from default values. Subclasses may override to
* use a different byte array representation. By default, this calls
* {@link ByteBuffer#wrap(byte[])}.
*/
protected Object createBytes(byte[] value) {
return ByteBuffer.wrap(value);
}
/** Skip an instance of a schema. */
public static void skip(Schema schema, Decoder in) throws IOException {
switch (schema.getType()) {
case RECORD:
for (Field field : schema.getFields())
skip(field.schema(), in);
break;
case ENUM:
in.readEnum();
break;
case ARRAY:
Schema elementType = schema.getElementType();
for (long l = in.skipArray(); l > 0; l = in.skipArray()) {
for (long i = 0; i < l; i++) {
skip(elementType, in);
}
}
break;
case MAP:
Schema value = schema.getValueType();
for (long l = in.skipMap(); l > 0; l = in.skipMap()) {
for (long i = 0; i < l; i++) {
in.skipString();
skip(value, in);
}
}
break;
case UNION:
skip(schema.getTypes().get(in.readIndex()), in);
break;
case FIXED:
in.skipFixed(schema.getFixedSize());
break;
case STRING:
in.skipString();
break;
case BYTES:
in.skipBytes();
break;
case INT:
in.readInt();
break;
case LONG:
in.readLong();
break;
case FLOAT:
in.readFloat();
break;
case DOUBLE:
in.readDouble();
break;
case BOOLEAN:
in.readBoolean();
break;
case NULL:
in.readNull();
break;
default:
throw new RuntimeException("Unknown type: " + schema);
}
}
}