blob: 96b17bbdea669719bbb05389b71b7af1ca328eb7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.reflect;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Conversion;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
import org.apache.avro.Protocol;
import org.apache.avro.Protocol.Message;
import org.apache.avro.Schema;
import org.apache.avro.SchemaNormalization;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.BinaryData;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.FixedSize;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.ClassUtils;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
/** Utilities to use existing Java classes and interfaces via reflection. */
public class ReflectData extends SpecificData {
private static final String STRING_OUTER_PARENT_REFERENCE = "this$0";
@Override
public boolean useCustomCoders() {
return false;
}
/**
* {@link ReflectData} implementation that permits null field values. The schema
* generated for each field is a union of its declared type and null.
*/
public static class AllowNull extends ReflectData {
private static final AllowNull INSTANCE = new AllowNull();
/** Return the singleton instance. */
public static AllowNull get() {
return INSTANCE;
}
@Override
protected Schema createFieldSchema(Field field, Map<String, Schema> names) {
Schema schema = super.createFieldSchema(field, names);
if (field.getType().isPrimitive()) {
// for primitive values, such as int, a null will result in a
// NullPointerException at read time
return schema;
}
return makeNullable(schema);
}
}
private static final ReflectData INSTANCE = new ReflectData();
/** For subclasses. Applications normally use {@link ReflectData#get()}. */
public ReflectData() {
}
/** Construct with a particular classloader. */
public ReflectData(ClassLoader classLoader) {
super(classLoader);
}
/** Return the singleton instance. */
public static ReflectData get() {
return INSTANCE;
}
/**
* Cause a class to be treated as though it had an {@link Stringable} *
* annotation.
*/
public ReflectData addStringable(Class c) {
stringableClasses.add(c);
return this;
}
/**
* If this flag is set to true, default values for fields will be assigned
* dynamically using Java reflections. When enabled, defaults are the field
* values of an instance created with a no-arg constructor.
*
* <p>
* Let's call this feature `default reflection`. Initially this feature is
* disabled.
*/
private boolean defaultGenerated = false;
/**
* Enable or disable `default reflection`
*
* @param enabled set to `true` to enable the feature. This feature is disabled
* by default
* @return The current instance
*/
public ReflectData setDefaultsGenerated(boolean enabled) {
this.defaultGenerated = enabled;
return this;
}
private final Map<Type, Object> defaultValues = new WeakHashMap<>();
/**
* Set the default value for a type. When encountering such type, we'll use this
* provided value instead of trying to create a new one.
*
* <p>
* NOTE: This method automatically enable feature `default reflection`.
*
* @param type The type
* @param value Its default value
* @return The current instance
*/
public ReflectData setDefaultGeneratedValue(Type type, Object value) {
this.defaultValues.put(type, value);
this.setDefaultsGenerated(true);
return this;
}
/**
* Get or create new value instance for a field
*
* @param type The current type
* @param field A child field
* @return The default field value
*/
protected Object getOrCreateDefaultValue(Type type, Field field) {
Object defaultValue = null;
field.setAccessible(true);
try {
Object typeValue = getOrCreateDefaultValue(type);
if (typeValue != null) {
defaultValue = field.get(typeValue);
}
} catch (Exception e) {
}
return defaultValue;
}
/**
* Get or create new value instance for a type.
*
* New instances will be instantiated using no-arg constructors. The newly
* created one will be cached for later use.
*
* @param type The type
* @return The value
*/
protected Object getOrCreateDefaultValue(Type type) {
return this.defaultValues.computeIfAbsent(type, ignored -> {
try {
Constructor constructor = ((Class) type).getDeclaredConstructor();
constructor.setAccessible(true);
return constructor.newInstance();
} catch (ClassCastException | InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
// do nothing
}
return null;
});
}
@Override
public DatumReader createDatumReader(Schema schema) {
return new ReflectDatumReader(schema, schema, this);
}
@Override
public DatumReader createDatumReader(Schema writer, Schema reader) {
return new ReflectDatumReader(writer, reader, this);
}
@Override
public DatumWriter createDatumWriter(Schema schema) {
return new ReflectDatumWriter(schema, this);
}
@Override
public void setField(Object record, String name, int position, Object value) {
setField(record, name, position, value, null);
}
@Override
protected void setField(Object record, String name, int position, Object value, Object state) {
if (record instanceof IndexedRecord) {
super.setField(record, name, position, value);
return;
}
try {
getAccessorForField(record, name, position, state).set(record, value);
} catch (IllegalAccessException | IOException e) {
throw new AvroRuntimeException(e);
}
}
@Override
public Object getField(Object record, String name, int position) {
return getField(record, name, position, null);
}
@Override
protected Object getField(Object record, String name, int pos, Object state) {
if (record instanceof IndexedRecord) {
return super.getField(record, name, pos);
}
try {
return getAccessorForField(record, name, pos, state).get(record);
} catch (IllegalAccessException e) {
throw new AvroRuntimeException(e);
}
}
private FieldAccessor getAccessorForField(Object record, String name, int pos, Object optionalState) {
if (optionalState != null) {
return ((FieldAccessor[]) optionalState)[pos];
}
return getFieldAccessor(record.getClass(), name);
}
@Override
protected boolean isRecord(Object datum) {
if (datum == null)
return false;
if (super.isRecord(datum))
return true;
if (datum instanceof Collection)
return false;
if (datum instanceof Map)
return false;
if (datum instanceof GenericFixed)
return false;
return getSchema(datum.getClass()).getType() == Schema.Type.RECORD;
}
/**
* Returns true for arrays and false otherwise, with the following exceptions:
*
* <ul>
* <li>
* <p>
* Returns true for non-string-keyed maps, which are written as an array of
* key/value pair records.
* <li>
* <p>
* Returns false for arrays of bytes, since those should be treated as byte data
* type instead.
* </ul>
*/
@Override
protected boolean isArray(Object datum) {
if (datum == null)
return false;
Class c = datum.getClass();
return (datum instanceof Collection) || (c.isArray() && c.getComponentType() != Byte.TYPE) || isNonStringMap(datum);
}
@Override
protected Collection getArrayAsCollection(Object datum) {
return (datum instanceof Map) ? ((Map) datum).entrySet() : (Collection) datum;
}
@Override
protected boolean isBytes(Object datum) {
if (datum == null)
return false;
if (super.isBytes(datum))
return true;
Class c = datum.getClass();
return c.isArray() && c.getComponentType() == Byte.TYPE;
}
@Override
protected Schema getRecordSchema(Object record) {
if (record instanceof GenericContainer)
return super.getRecordSchema(record);
return getSchema(record.getClass());
}
@Override
public boolean validate(Schema schema, Object datum) {
switch (schema.getType()) {
case ARRAY:
if (!datum.getClass().isArray())
return super.validate(schema, datum);
int length = java.lang.reflect.Array.getLength(datum);
for (int i = 0; i < length; i++)
if (!validate(schema.getElementType(), java.lang.reflect.Array.get(datum, i)))
return false;
return true;
default:
return super.validate(schema, datum);
}
}
static final ClassValue<ClassAccessorData> ACCESSOR_CACHE = new ClassValue<ClassAccessorData>() {
@Override
protected ClassAccessorData computeValue(Class<?> c) {
if (!IndexedRecord.class.isAssignableFrom(c)) {
return new ClassAccessorData(c);
}
return null;
}
};
static class ClassAccessorData {
private final Class<?> clazz;
private final Map<String, FieldAccessor> byName = new HashMap<>();
// getAccessorsFor is already synchronized, no need to wrap
final Map<Schema, FieldAccessor[]> bySchema = new WeakHashMap<>();
private ClassAccessorData(Class<?> c) {
clazz = c;
for (Field f : getFields(c, false)) {
if (f.isAnnotationPresent(AvroIgnore.class)) {
continue;
}
FieldAccessor accessor = ReflectionUtil.getFieldAccess().getAccessor(f);
AvroName avroname = f.getAnnotation(AvroName.class);
byName.put((avroname != null ? avroname.value() : f.getName()), accessor);
}
}
/**
* Return the field accessors as an array, indexed by the field index of the
* given schema.
*/
private synchronized FieldAccessor[] getAccessorsFor(Schema schema) {
// if synchronized is removed from this method, adjust bySchema appropriately
FieldAccessor[] result = bySchema.get(schema);
if (result == null) {
result = createAccessorsFor(schema);
bySchema.put(schema, result);
}
return result;
}
private FieldAccessor[] createAccessorsFor(Schema schema) {
List<Schema.Field> avroFields = schema.getFields();
FieldAccessor[] result = new FieldAccessor[avroFields.size()];
for (Schema.Field avroField : schema.getFields()) {
result[avroField.pos()] = byName.get(avroField.name());
}
return result;
}
private FieldAccessor getAccessorFor(String fieldName) {
FieldAccessor result = byName.get(fieldName);
if (result == null) {
throw new AvroRuntimeException("No field named " + fieldName + " in: " + clazz);
}
return result;
}
}
private ClassAccessorData getClassAccessorData(Class<?> c) {
return ACCESSOR_CACHE.get(c);
}
private FieldAccessor[] getFieldAccessors(Class<?> c, Schema s) {
ClassAccessorData data = getClassAccessorData(c);
if (data != null) {
return data.getAccessorsFor(s);
}
return null;
}
private FieldAccessor getFieldAccessor(Class<?> c, String fieldName) {
ClassAccessorData data = getClassAccessorData(c);
if (data != null) {
return data.getAccessorFor(fieldName);
}
return null;
}
/** @deprecated Replaced by {@link SpecificData#CLASS_PROP} */
@Deprecated
static final String CLASS_PROP = "java-class";
/** @deprecated Replaced by {@link SpecificData#KEY_CLASS_PROP} */
@Deprecated
static final String KEY_CLASS_PROP = "java-key-class";
/** @deprecated Replaced by {@link SpecificData#ELEMENT_PROP} */
@Deprecated
static final String ELEMENT_PROP = "java-element-class";
private static final Map<String, Class> CLASS_CACHE = new ConcurrentHashMap<>();
static Class getClassProp(Schema schema, String prop) {
String name = schema.getProp(prop);
if (name == null)
return null;
Class c = CLASS_CACHE.get(name);
if (c != null)
return c;
try {
c = ClassUtils.forName(name);
CLASS_CACHE.put(name, c);
} catch (ClassNotFoundException e) {
throw new AvroRuntimeException(e);
}
return c;
}
private static final Class BYTES_CLASS = byte[].class;
private static final IdentityHashMap<Class, Class> ARRAY_CLASSES;
static {
ARRAY_CLASSES = new IdentityHashMap<>();
ARRAY_CLASSES.put(byte.class, byte[].class);
ARRAY_CLASSES.put(char.class, char[].class);
ARRAY_CLASSES.put(short.class, short[].class);
ARRAY_CLASSES.put(int.class, int[].class);
ARRAY_CLASSES.put(long.class, long[].class);
ARRAY_CLASSES.put(float.class, float[].class);
ARRAY_CLASSES.put(double.class, double[].class);
ARRAY_CLASSES.put(boolean.class, boolean[].class);
}
/**
* It returns false for non-string-maps because Avro writes out such maps as an
* array of records. Even their JSON representation is an array.
*/
@Override
protected boolean isMap(Object datum) {
return (datum instanceof Map) && !isNonStringMap(datum);
}
/*
* Without the Field or Schema corresponding to the datum, it is not possible to
* accurately find out the non-stringable nature of the key. So we check the
* class of the keys. If the map is empty, then it doesn't matter whether its
* considered a string-key map or a non-string-key map
*/
private boolean isNonStringMap(Object datum) {
if (datum instanceof Map) {
Map m = (Map) datum;
if (m.size() > 0) {
Class keyClass = m.keySet().iterator().next().getClass();
return !isStringable(keyClass) && !isStringType(keyClass);
}
}
return false;
}
@Override
public Class getClass(Schema schema) {
// see if the element class will be converted and use that class
Conversion<?> conversion = getConversionFor(schema.getLogicalType());
if (conversion != null) {
return conversion.getConvertedType();
}
switch (schema.getType()) {
case ARRAY:
Class collectionClass = getClassProp(schema, CLASS_PROP);
if (collectionClass != null)
return collectionClass;
Class elementClass = getClass(schema.getElementType());
if (elementClass.isPrimitive()) {
// avoid expensive translation to array type when primitive
return ARRAY_CLASSES.get(elementClass);
} else {
return java.lang.reflect.Array.newInstance(elementClass, 0).getClass();
}
case STRING:
Class stringClass = getClassProp(schema, CLASS_PROP);
if (stringClass != null)
return stringClass;
return String.class;
case BYTES:
return BYTES_CLASS;
case INT:
String intClass = schema.getProp(CLASS_PROP);
if (Byte.class.getName().equals(intClass))
return Byte.TYPE;
if (Short.class.getName().equals(intClass))
return Short.TYPE;
if (Character.class.getName().equals(intClass))
return Character.TYPE;
default:
return super.getClass(schema);
}
}
static final String NS_MAP_ARRAY_RECORD = // record name prefix
"org.apache.avro.reflect.Pair";
static final String NS_MAP_KEY = "key"; // name of key field
static final int NS_MAP_KEY_INDEX = 0; // index of key field
static final String NS_MAP_VALUE = "value"; // name of value field
static final int NS_MAP_VALUE_INDEX = 1; // index of value field
/*
* Non-string map-keys need special handling and we convert it to an array of
* records as: [{"key":{...}, "value":{...}}]
*/
Schema createNonStringMapSchema(Type keyType, Type valueType, Map<String, Schema> names) {
Schema keySchema = createSchema(keyType, names);
Schema valueSchema = createSchema(valueType, names);
Schema.Field keyField = new Schema.Field(NS_MAP_KEY, keySchema, null, null);
Schema.Field valueField = new Schema.Field(NS_MAP_VALUE, valueSchema, null, null);
String name = getNameForNonStringMapRecord(keyType, valueType, keySchema, valueSchema);
Schema elementSchema = Schema.createRecord(name, null, null, false);
elementSchema.setFields(Arrays.asList(keyField, valueField));
Schema arraySchema = Schema.createArray(elementSchema);
return arraySchema;
}
/*
* Gets a unique and consistent name per key-value pair. So if the same
* key-value are seen in another map, the same name is generated again.
*/
private String getNameForNonStringMapRecord(Type keyType, Type valueType, Schema keySchema, Schema valueSchema) {
// Generate a nice name for classes in java* package
if (keyType instanceof Class && valueType instanceof Class) {
Class keyClass = (Class) keyType;
Class valueClass = (Class) valueType;
Package pkg1 = keyClass.getPackage();
Package pkg2 = valueClass.getPackage();
if (pkg1 != null && pkg1.getName().startsWith("java") && pkg2 != null && pkg2.getName().startsWith("java")) {
return NS_MAP_ARRAY_RECORD + keyClass.getSimpleName() + valueClass.getSimpleName();
}
}
String name = keySchema.getFullName() + valueSchema.getFullName();
long fingerprint = SchemaNormalization.fingerprint64(name.getBytes(StandardCharsets.UTF_8));
if (fingerprint < 0)
fingerprint = -fingerprint; // ignore sign
String fpString = Long.toString(fingerprint, 16); // hex
return NS_MAP_ARRAY_RECORD + fpString;
}
static boolean isNonStringMapSchema(Schema s) {
if (s != null && s.getType() == Schema.Type.ARRAY) {
Class c = getClassProp(s, CLASS_PROP);
return c != null && Map.class.isAssignableFrom(c);
}
return false;
}
/**
* Get default value for a schema field. Derived classes can override this
* method to provide values based on object instantiation
*
* @param type Type
* @param field Field
* @param fieldSchema Schema of the field
* @return The default value
*/
protected Object createSchemaDefaultValue(Type type, Field field, Schema fieldSchema) {
Object defaultValue;
if (defaultGenerated) {
defaultValue = getOrCreateDefaultValue(type, field);
if (defaultValue != null) {
return deepCopy(fieldSchema, defaultValue);
}
// if we can't get the default value, try to use previous code below
}
AvroDefault defaultAnnotation = field.getAnnotation(AvroDefault.class);
defaultValue = (defaultAnnotation == null) ? null : Schema.parseJsonToObject(defaultAnnotation.value());
if (defaultValue == null && fieldSchema.getType() == Schema.Type.UNION) {
Schema defaultType = fieldSchema.getTypes().get(0);
if (defaultType.getType() == Schema.Type.NULL) {
defaultValue = JsonProperties.NULL_VALUE;
}
}
return defaultValue;
}
@Override
protected Schema createSchema(Type type, Map<String, Schema> names) {
if (type instanceof GenericArrayType) { // generic array
Type component = ((GenericArrayType) type).getGenericComponentType();
if (component == Byte.TYPE) // byte array
return Schema.create(Schema.Type.BYTES);
Schema result = Schema.createArray(createSchema(component, names));
setElement(result, component);
return result;
} else if (type instanceof ParameterizedType) {
ParameterizedType ptype = (ParameterizedType) type;
Class raw = (Class) ptype.getRawType();
Type[] params = ptype.getActualTypeArguments();
if (Map.class.isAssignableFrom(raw)) { // Map
Class key = (Class) params[0];
if (isStringable(key)) { // Stringable key
Schema schema = Schema.createMap(createSchema(params[1], names));
schema.addProp(KEY_CLASS_PROP, key.getName());
return schema;
} else if (key != String.class) {
Schema schema = createNonStringMapSchema(params[0], params[1], names);
schema.addProp(CLASS_PROP, raw.getName());
return schema;
}
} else if (Collection.class.isAssignableFrom(raw)) { // Collection
if (params.length != 1)
throw new AvroTypeException("No array type specified.");
Schema schema = Schema.createArray(createSchema(params[0], names));
schema.addProp(CLASS_PROP, raw.getName());
return schema;
}
} else if ((type == Byte.class) || (type == Byte.TYPE)) {
Schema result = Schema.create(Schema.Type.INT);
result.addProp(CLASS_PROP, Byte.class.getName());
return result;
} else if ((type == Short.class) || (type == Short.TYPE)) {
Schema result = Schema.create(Schema.Type.INT);
result.addProp(CLASS_PROP, Short.class.getName());
return result;
} else if ((type == Character.class) || (type == Character.TYPE)) {
Schema result = Schema.create(Schema.Type.INT);
result.addProp(CLASS_PROP, Character.class.getName());
return result;
} else if (type instanceof Class) { // Class
Class<?> c = (Class<?>) type;
if (c.isPrimitive() || // primitives
c == Void.class || c == Boolean.class || c == Integer.class || c == Long.class || c == Float.class
|| c == Double.class || c == Byte.class || c == Short.class || c == Character.class)
return super.createSchema(type, names);
if (c.isArray()) { // array
Class component = c.getComponentType();
if (component == Byte.TYPE) { // byte array
Schema result = Schema.create(Schema.Type.BYTES);
result.addProp(CLASS_PROP, c.getName());
return result;
}
Schema result = Schema.createArray(createSchema(component, names));
result.addProp(CLASS_PROP, c.getName());
setElement(result, component);
return result;
}
AvroSchema explicit = c.getAnnotation(AvroSchema.class);
if (explicit != null) // explicit schema
return new Schema.Parser().parse(explicit.value());
if (CharSequence.class.isAssignableFrom(c)) // String
return Schema.create(Schema.Type.STRING);
if (ByteBuffer.class.isAssignableFrom(c)) // bytes
return Schema.create(Schema.Type.BYTES);
if (Collection.class.isAssignableFrom(c)) // array
throw new AvroRuntimeException("Can't find element type of Collection");
Conversion<?> conversion = getConversionByClass(c);
if (conversion != null) {
return conversion.getRecommendedSchema();
}
String fullName = c.getName();
Schema schema = names.get(fullName);
if (schema == null) {
AvroDoc annotatedDoc = c.getAnnotation(AvroDoc.class); // Docstring
String doc = (annotatedDoc != null) ? annotatedDoc.value() : null;
String name = c.getSimpleName();
String space = c.getPackage() == null ? "" : c.getPackage().getName();
if (c.getEnclosingClass() != null) // nested class
space = c.getEnclosingClass().getName();
Union union = c.getAnnotation(Union.class);
if (union != null) { // union annotated
return getAnnotatedUnion(union, names);
} else if (isStringable(c)) { // Stringable
Schema result = Schema.create(Schema.Type.STRING);
result.addProp(CLASS_PROP, c.getName());
return result;
} else if (c.isEnum()) { // Enum
List<String> symbols = new ArrayList<>();
Enum[] constants = (Enum[]) c.getEnumConstants();
for (Enum constant : constants)
symbols.add(constant.name());
schema = Schema.createEnum(name, doc, space, symbols);
consumeAvroAliasAnnotation(c, schema);
} else if (GenericFixed.class.isAssignableFrom(c)) { // fixed
int size = c.getAnnotation(FixedSize.class).value();
schema = Schema.createFixed(name, doc, space, size);
consumeAvroAliasAnnotation(c, schema);
} else if (IndexedRecord.class.isAssignableFrom(c)) { // specific
return super.createSchema(type, names);
} else { // record
List<Schema.Field> fields = new ArrayList<>();
boolean error = Throwable.class.isAssignableFrom(c);
schema = Schema.createRecord(name, doc, space, error);
consumeAvroAliasAnnotation(c, schema);
names.put(c.getName(), schema);
for (Field field : getCachedFields(c))
if ((field.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC)) == 0
&& !field.isAnnotationPresent(AvroIgnore.class)) {
Schema fieldSchema = createFieldSchema(field, names);
annotatedDoc = field.getAnnotation(AvroDoc.class); // Docstring
doc = (annotatedDoc != null) ? annotatedDoc.value() : null;
Object defaultValue = createSchemaDefaultValue(type, field, fieldSchema);
AvroName annotatedName = field.getAnnotation(AvroName.class); // Rename fields
String fieldName = (annotatedName != null) ? annotatedName.value() : field.getName();
if (STRING_OUTER_PARENT_REFERENCE.equals(fieldName)) {
throw new AvroTypeException("Class " + fullName + " must be a static inner class");
}
Schema.Field recordField = new Schema.Field(fieldName, fieldSchema, doc, defaultValue);
AvroMeta[] metadata = field.getAnnotationsByType(AvroMeta.class); // add metadata
for (AvroMeta meta : metadata) {
if (recordField.getObjectProps().containsKey(meta.key())) {
throw new AvroTypeException("Duplicate field prop key: " + meta.key());
}
recordField.addProp(meta.key(), meta.value());
}
for (Schema.Field f : fields) {
if (f.name().equals(fieldName))
throw new AvroTypeException("double field entry: " + fieldName);
}
consumeFieldAlias(field, recordField);
fields.add(recordField);
}
if (error) // add Throwable message
fields.add(new Schema.Field("detailMessage", THROWABLE_MESSAGE, null, null));
schema.setFields(fields);
AvroMeta[] metadata = c.getAnnotationsByType(AvroMeta.class);
for (AvroMeta meta : metadata) {
if (schema.getObjectProps().containsKey(meta.key())) {
throw new AvroTypeException("Duplicate type prop key: " + meta.key());
}
schema.addProp(meta.key(), meta.value());
}
}
names.put(fullName, schema);
}
return schema;
}
return super.createSchema(type, names);
}
@Override
protected boolean isStringable(Class<?> c) {
return c.isAnnotationPresent(Stringable.class) || super.isStringable(c);
}
private static final Schema THROWABLE_MESSAGE = makeNullable(Schema.create(Schema.Type.STRING));
// if array element type is a class with a union annotation, note it
// this is required because we cannot set a property on the union itself
private void setElement(Schema schema, Type element) {
if (!(element instanceof Class))
return;
Class<?> c = (Class<?>) element;
Union union = c.getAnnotation(Union.class);
if (union != null) // element is annotated union
schema.addProp(ELEMENT_PROP, c.getName());
}
// construct a schema from a union annotation
private Schema getAnnotatedUnion(Union union, Map<String, Schema> names) {
List<Schema> branches = new ArrayList<>();
for (Class branch : union.value())
branches.add(createSchema(branch, names));
return Schema.createUnion(branches);
}
/** Create and return a union of the null schema and the provided schema. */
public static Schema makeNullable(Schema schema) {
if (schema.getType() == Schema.Type.UNION) {
// check to see if the union already contains NULL
for (Schema subType : schema.getTypes()) {
if (subType.getType() == Schema.Type.NULL) {
return schema;
}
}
// add null as the first type in a new union
List<Schema> withNull = new ArrayList<>();
withNull.add(Schema.create(Schema.Type.NULL));
withNull.addAll(schema.getTypes());
return Schema.createUnion(withNull);
} else {
// create a union with null
return Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), schema));
}
}
private static final Map<Class<?>, Field[]> FIELDS_CACHE = new ConcurrentHashMap<>();
// Return of this class and its superclasses to serialize.
private static Field[] getCachedFields(Class<?> recordClass) {
return FIELDS_CACHE.computeIfAbsent(recordClass, rc -> getFields(rc, true));
}
private static Field[] getFields(Class<?> recordClass, boolean excludeJava) {
Field[] fieldsList;
Map<String, Field> fields = new LinkedHashMap<>();
Class<?> c = recordClass;
do {
if (excludeJava && c.getPackage() != null && c.getPackage().getName().startsWith("java."))
break; // skip java built-in classes
Field[] declaredFields = c.getDeclaredFields();
Arrays.sort(declaredFields, Comparator.comparing(Field::getName));
for (Field field : declaredFields)
if ((field.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC)) == 0)
if (fields.put(field.getName(), field) != null)
throw new AvroTypeException(c + " contains two fields named: " + field);
c = c.getSuperclass();
} while (c != null);
fieldsList = fields.values().toArray(new Field[0]);
return fieldsList;
}
/** Create a schema for a field. */
protected Schema createFieldSchema(Field field, Map<String, Schema> names) {
AvroEncode enc = field.getAnnotation(AvroEncode.class);
if (enc != null)
try {
return enc.using().getDeclaredConstructor().newInstance().getSchema();
} catch (Exception e) {
throw new AvroRuntimeException("Could not create schema from custom serializer for " + field.getName());
}
AvroSchema explicit = field.getAnnotation(AvroSchema.class);
if (explicit != null) // explicit schema
return new Schema.Parser().parse(explicit.value());
Union union = field.getAnnotation(Union.class);
if (union != null)
return getAnnotatedUnion(union, names);
Schema schema = createSchema(field.getGenericType(), names);
if (field.isAnnotationPresent(Stringable.class)) { // Stringable
schema = Schema.create(Schema.Type.STRING);
}
if (field.isAnnotationPresent(Nullable.class)) // nullable
schema = makeNullable(schema);
return schema;
}
/**
* Return the protocol for a Java interface.
*
* <p>
* The correct name of the method parameters needs the <code>-parameters</code>
* java compiler argument. More info at https://openjdk.java.net/jeps/118
*/
@Override
public Protocol getProtocol(Class iface) {
Protocol protocol = new Protocol(iface.getSimpleName(),
iface.getPackage() == null ? "" : iface.getPackage().getName());
Map<String, Schema> names = new LinkedHashMap<>();
Map<String, Message> messages = protocol.getMessages();
Map<TypeVariable<?>, Type> genericTypeVariableMap = ReflectionUtil.resolveTypeVariables(iface);
for (Method method : iface.getMethods()) {
if ((method.getModifiers() & Modifier.STATIC) == 0) {
String name = method.getName();
if (messages.containsKey(name))
throw new AvroTypeException("Two methods with same name: " + name);
messages.put(name, getMessage(method, protocol, names, genericTypeVariableMap));
}
}
// reverse types, since they were defined in reference order
List<Schema> types = new ArrayList<>(names.values());
Collections.reverse(types);
protocol.setTypes(types);
return protocol;
}
private Message getMessage(Method method, Protocol protocol, Map<String, Schema> names,
Map<? extends Type, Type> genericTypeMap) {
List<Schema.Field> fields = new ArrayList<>();
for (Parameter parameter : method.getParameters()) {
Schema paramSchema = getSchema(genericTypeMap.getOrDefault(parameter.getParameterizedType(), parameter.getType()),
names);
for (Annotation annotation : parameter.getAnnotations()) {
if (annotation instanceof AvroSchema) // explicit schema
paramSchema = new Schema.Parser().parse(((AvroSchema) annotation).value());
else if (annotation instanceof Union) // union
paramSchema = getAnnotatedUnion(((Union) annotation), names);
else if (annotation instanceof Nullable) // nullable
paramSchema = makeNullable(paramSchema);
}
fields.add(new Schema.Field(parameter.getName(), paramSchema, null /* doc */, null));
}
Schema request = Schema.createRecord(fields);
Type genericReturnType = method.getGenericReturnType();
Type returnType = genericTypeMap.getOrDefault(genericReturnType, genericReturnType);
Union union = method.getAnnotation(Union.class);
Schema response = union == null ? getSchema(returnType, names) : getAnnotatedUnion(union, names);
if (method.isAnnotationPresent(Nullable.class)) // nullable
response = makeNullable(response);
AvroSchema explicit = method.getAnnotation(AvroSchema.class);
if (explicit != null) // explicit schema
response = new Schema.Parser().parse(explicit.value());
List<Schema> errs = new ArrayList<>();
errs.add(Protocol.SYSTEM_ERROR); // every method can throw
for (Type err : method.getGenericExceptionTypes())
errs.add(getSchema(err, names));
Schema errors = Schema.createUnion(errs);
return protocol.createMessage(method.getName(), null /* doc */, Collections.emptyMap() /* propMap */, request,
response, errors);
}
private Schema getSchema(Type type, Map<String, Schema> names) {
try {
return createSchema(type, names);
} catch (AvroTypeException e) { // friendly exception
throw new AvroTypeException("Error getting schema for " + type + ": " + e.getMessage(), e);
}
}
@Override
protected int compare(Object o1, Object o2, Schema s, boolean equals) {
switch (s.getType()) {
case ARRAY:
if (!o1.getClass().isArray())
break;
Schema elementType = s.getElementType();
int l1 = java.lang.reflect.Array.getLength(o1);
int l2 = java.lang.reflect.Array.getLength(o2);
int l = Math.min(l1, l2);
for (int i = 0; i < l; i++) {
int compare = compare(java.lang.reflect.Array.get(o1, i), java.lang.reflect.Array.get(o2, i), elementType,
equals);
if (compare != 0)
return compare;
}
return Integer.compare(l1, l2);
case BYTES:
if (!o1.getClass().isArray())
break;
byte[] b1 = (byte[]) o1;
byte[] b2 = (byte[]) o2;
return BinaryData.compareBytes(b1, 0, b1.length, b2, 0, b2.length);
}
return super.compare(o1, o2, s, equals);
}
@Override
protected Object getRecordState(Object record, Schema schema) {
return getFieldAccessors(record.getClass(), schema);
}
private void consumeAvroAliasAnnotation(Class<?> c, Schema schema) {
AvroAlias[] aliases = c.getAnnotationsByType(AvroAlias.class);
for (AvroAlias alias : aliases) {
String space = alias.space();
if (AvroAlias.NULL.equals(space))
space = null;
schema.addAlias(alias.alias(), space);
}
}
private void consumeFieldAlias(Field field, Schema.Field recordField) {
AvroAlias[] aliases = field.getAnnotationsByType(AvroAlias.class);
for (AvroAlias alias : aliases) {
if (!alias.space().equals(AvroAlias.NULL)) {
throw new AvroRuntimeException(
"Namespaces are not allowed on field aliases. " + "Offending field: " + recordField.name());
}
recordField.addAlias(alias.alias());
}
}
@Override
public Object createFixed(Object old, Schema schema) {
// SpecificData will try to instantiate the type returned by getClass, but
// that is the converted class and can't be constructed.
LogicalType logicalType = schema.getLogicalType();
if (logicalType != null) {
Conversion<?> conversion = getConversionFor(schema.getLogicalType());
if (conversion != null) {
return new GenericData.Fixed(schema);
}
}
return super.createFixed(old, schema);
}
@Override
public Object newRecord(Object old, Schema schema) {
// SpecificData will try to instantiate the type returned by getClass, but
// that is the converted class and can't be constructed.
LogicalType logicalType = schema.getLogicalType();
if (logicalType != null) {
Conversion<?> conversion = getConversionFor(schema.getLogicalType());
if (conversion != null) {
return new GenericData.Record(schema);
}
}
return super.newRecord(old, schema);
}
}