| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.avro.reflect; |
| |
| import org.apache.avro.AvroRuntimeException; |
| import org.apache.avro.AvroTypeException; |
| import org.apache.avro.Conversion; |
| import org.apache.avro.JsonProperties; |
| import org.apache.avro.LogicalType; |
| import org.apache.avro.Protocol; |
| import org.apache.avro.Protocol.Message; |
| import org.apache.avro.Schema; |
| import org.apache.avro.SchemaNormalization; |
| import org.apache.avro.generic.GenericContainer; |
| import org.apache.avro.generic.GenericData; |
| import org.apache.avro.generic.GenericFixed; |
| import org.apache.avro.generic.IndexedRecord; |
| import org.apache.avro.io.BinaryData; |
| import org.apache.avro.io.DatumReader; |
| import org.apache.avro.io.DatumWriter; |
| import org.apache.avro.specific.FixedSize; |
| import org.apache.avro.specific.SpecificData; |
| import org.apache.avro.util.ClassUtils; |
| |
| import java.io.IOException; |
| import java.lang.annotation.Annotation; |
| import java.lang.reflect.Constructor; |
| import java.lang.reflect.Field; |
| import java.lang.reflect.GenericArrayType; |
| import java.lang.reflect.InvocationTargetException; |
| import java.lang.reflect.Method; |
| import java.lang.reflect.Modifier; |
| import java.lang.reflect.Parameter; |
| import java.lang.reflect.ParameterizedType; |
| import java.lang.reflect.Type; |
| import java.lang.reflect.TypeVariable; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.IdentityHashMap; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.WeakHashMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| /** Utilities to use existing Java classes and interfaces via reflection. */ |
| public class ReflectData extends SpecificData { |
| |
| private static final String STRING_OUTER_PARENT_REFERENCE = "this$0"; |
| |
| @Override |
| public boolean useCustomCoders() { |
| return false; |
| } |
| |
| /** |
| * {@link ReflectData} implementation that permits null field values. The schema |
| * generated for each field is a union of its declared type and null. |
| */ |
| public static class AllowNull extends ReflectData { |
| |
| private static final AllowNull INSTANCE = new AllowNull(); |
| |
| /** Return the singleton instance. */ |
| public static AllowNull get() { |
| return INSTANCE; |
| } |
| |
| @Override |
| protected Schema createFieldSchema(Field field, Map<String, Schema> names) { |
| Schema schema = super.createFieldSchema(field, names); |
| if (field.getType().isPrimitive()) { |
| // for primitive values, such as int, a null will result in a |
| // NullPointerException at read time |
| return schema; |
| } |
| return makeNullable(schema); |
| } |
| } |
| |
| private static final ReflectData INSTANCE = new ReflectData(); |
| |
| /** For subclasses. Applications normally use {@link ReflectData#get()}. */ |
| public ReflectData() { |
| } |
| |
| /** Construct with a particular classloader. */ |
| public ReflectData(ClassLoader classLoader) { |
| super(classLoader); |
| } |
| |
| /** Return the singleton instance. */ |
| public static ReflectData get() { |
| return INSTANCE; |
| } |
| |
| /** |
| * Cause a class to be treated as though it had an {@link Stringable} * |
| * annotation. |
| */ |
| public ReflectData addStringable(Class c) { |
| stringableClasses.add(c); |
| return this; |
| } |
| |
| /** |
| * If this flag is set to true, default values for fields will be assigned |
| * dynamically using Java reflections. When enabled, defaults are the field |
| * values of an instance created with a no-arg constructor. |
| * |
| * <p> |
| * Let's call this feature `default reflection`. Initially this feature is |
| * disabled. |
| */ |
| private boolean defaultGenerated = false; |
| |
| /** |
| * Enable or disable `default reflection` |
| * |
| * @param enabled set to `true` to enable the feature. This feature is disabled |
| * by default |
| * @return The current instance |
| */ |
| public ReflectData setDefaultsGenerated(boolean enabled) { |
| this.defaultGenerated = enabled; |
| return this; |
| } |
| |
| private final Map<Type, Object> defaultValues = new WeakHashMap<>(); |
| |
| /** |
| * Set the default value for a type. When encountering such type, we'll use this |
| * provided value instead of trying to create a new one. |
| * |
| * <p> |
| * NOTE: This method automatically enable feature `default reflection`. |
| * |
| * @param type The type |
| * @param value Its default value |
| * @return The current instance |
| */ |
| public ReflectData setDefaultGeneratedValue(Type type, Object value) { |
| this.defaultValues.put(type, value); |
| this.setDefaultsGenerated(true); |
| return this; |
| } |
| |
| /** |
| * Get or create new value instance for a field |
| * |
| * @param type The current type |
| * @param field A child field |
| * @return The default field value |
| */ |
| protected Object getOrCreateDefaultValue(Type type, Field field) { |
| Object defaultValue = null; |
| field.setAccessible(true); |
| try { |
| Object typeValue = getOrCreateDefaultValue(type); |
| if (typeValue != null) { |
| defaultValue = field.get(typeValue); |
| } |
| } catch (Exception e) { |
| |
| } |
| return defaultValue; |
| } |
| |
| /** |
| * Get or create new value instance for a type. |
| * |
| * New instances will be instantiated using no-arg constructors. The newly |
| * created one will be cached for later use. |
| * |
| * @param type The type |
| * @return The value |
| */ |
| protected Object getOrCreateDefaultValue(Type type) { |
| return this.defaultValues.computeIfAbsent(type, ignored -> { |
| try { |
| Constructor constructor = ((Class) type).getDeclaredConstructor(); |
| constructor.setAccessible(true); |
| return constructor.newInstance(); |
| } catch (ClassCastException | InstantiationException | IllegalAccessException | NoSuchMethodException |
| | InvocationTargetException e) { |
| // do nothing |
| } |
| return null; |
| }); |
| } |
| |
| @Override |
| public DatumReader createDatumReader(Schema schema) { |
| return new ReflectDatumReader(schema, schema, this); |
| } |
| |
| @Override |
| public DatumReader createDatumReader(Schema writer, Schema reader) { |
| return new ReflectDatumReader(writer, reader, this); |
| } |
| |
| @Override |
| public DatumWriter createDatumWriter(Schema schema) { |
| return new ReflectDatumWriter(schema, this); |
| } |
| |
| @Override |
| public void setField(Object record, String name, int position, Object value) { |
| setField(record, name, position, value, null); |
| } |
| |
| @Override |
| protected void setField(Object record, String name, int position, Object value, Object state) { |
| if (record instanceof IndexedRecord) { |
| super.setField(record, name, position, value); |
| return; |
| } |
| try { |
| getAccessorForField(record, name, position, state).set(record, value); |
| } catch (IllegalAccessException | IOException e) { |
| throw new AvroRuntimeException(e); |
| } |
| } |
| |
| @Override |
| public Object getField(Object record, String name, int position) { |
| return getField(record, name, position, null); |
| } |
| |
| @Override |
| protected Object getField(Object record, String name, int pos, Object state) { |
| if (record instanceof IndexedRecord) { |
| return super.getField(record, name, pos); |
| } |
| try { |
| return getAccessorForField(record, name, pos, state).get(record); |
| } catch (IllegalAccessException e) { |
| throw new AvroRuntimeException(e); |
| } |
| } |
| |
| private FieldAccessor getAccessorForField(Object record, String name, int pos, Object optionalState) { |
| if (optionalState != null) { |
| return ((FieldAccessor[]) optionalState)[pos]; |
| } |
| return getFieldAccessor(record.getClass(), name); |
| } |
| |
| @Override |
| protected boolean isRecord(Object datum) { |
| if (datum == null) |
| return false; |
| if (super.isRecord(datum)) |
| return true; |
| if (datum instanceof Collection) |
| return false; |
| if (datum instanceof Map) |
| return false; |
| if (datum instanceof GenericFixed) |
| return false; |
| return getSchema(datum.getClass()).getType() == Schema.Type.RECORD; |
| } |
| |
| /** |
| * Returns true for arrays and false otherwise, with the following exceptions: |
| * |
| * <ul> |
| * <li> |
| * <p> |
| * Returns true for non-string-keyed maps, which are written as an array of |
| * key/value pair records. |
| * <li> |
| * <p> |
| * Returns false for arrays of bytes, since those should be treated as byte data |
| * type instead. |
| * </ul> |
| */ |
| @Override |
| protected boolean isArray(Object datum) { |
| if (datum == null) |
| return false; |
| Class c = datum.getClass(); |
| return (datum instanceof Collection) || (c.isArray() && c.getComponentType() != Byte.TYPE) || isNonStringMap(datum); |
| } |
| |
| @Override |
| protected Collection getArrayAsCollection(Object datum) { |
| return (datum instanceof Map) ? ((Map) datum).entrySet() : (Collection) datum; |
| } |
| |
| @Override |
| protected boolean isBytes(Object datum) { |
| if (datum == null) |
| return false; |
| if (super.isBytes(datum)) |
| return true; |
| Class c = datum.getClass(); |
| return c.isArray() && c.getComponentType() == Byte.TYPE; |
| } |
| |
| @Override |
| protected Schema getRecordSchema(Object record) { |
| if (record instanceof GenericContainer) |
| return super.getRecordSchema(record); |
| return getSchema(record.getClass()); |
| } |
| |
| @Override |
| public boolean validate(Schema schema, Object datum) { |
| switch (schema.getType()) { |
| case ARRAY: |
| if (!datum.getClass().isArray()) |
| return super.validate(schema, datum); |
| int length = java.lang.reflect.Array.getLength(datum); |
| for (int i = 0; i < length; i++) |
| if (!validate(schema.getElementType(), java.lang.reflect.Array.get(datum, i))) |
| return false; |
| return true; |
| default: |
| return super.validate(schema, datum); |
| } |
| } |
| |
| static final ClassValue<ClassAccessorData> ACCESSOR_CACHE = new ClassValue<ClassAccessorData>() { |
| @Override |
| protected ClassAccessorData computeValue(Class<?> c) { |
| if (!IndexedRecord.class.isAssignableFrom(c)) { |
| return new ClassAccessorData(c); |
| } |
| return null; |
| } |
| }; |
| |
| static class ClassAccessorData { |
| private final Class<?> clazz; |
| private final Map<String, FieldAccessor> byName = new HashMap<>(); |
| // getAccessorsFor is already synchronized, no need to wrap |
| final Map<Schema, FieldAccessor[]> bySchema = new WeakHashMap<>(); |
| |
| private ClassAccessorData(Class<?> c) { |
| clazz = c; |
| for (Field f : getFields(c, false)) { |
| if (f.isAnnotationPresent(AvroIgnore.class)) { |
| continue; |
| } |
| FieldAccessor accessor = ReflectionUtil.getFieldAccess().getAccessor(f); |
| AvroName avroname = f.getAnnotation(AvroName.class); |
| byName.put((avroname != null ? avroname.value() : f.getName()), accessor); |
| } |
| } |
| |
| /** |
| * Return the field accessors as an array, indexed by the field index of the |
| * given schema. |
| */ |
| private synchronized FieldAccessor[] getAccessorsFor(Schema schema) { |
| // if synchronized is removed from this method, adjust bySchema appropriately |
| FieldAccessor[] result = bySchema.get(schema); |
| if (result == null) { |
| result = createAccessorsFor(schema); |
| bySchema.put(schema, result); |
| } |
| return result; |
| } |
| |
| private FieldAccessor[] createAccessorsFor(Schema schema) { |
| List<Schema.Field> avroFields = schema.getFields(); |
| FieldAccessor[] result = new FieldAccessor[avroFields.size()]; |
| for (Schema.Field avroField : schema.getFields()) { |
| result[avroField.pos()] = byName.get(avroField.name()); |
| } |
| return result; |
| } |
| |
| private FieldAccessor getAccessorFor(String fieldName) { |
| FieldAccessor result = byName.get(fieldName); |
| if (result == null) { |
| throw new AvroRuntimeException("No field named " + fieldName + " in: " + clazz); |
| } |
| return result; |
| } |
| } |
| |
| private ClassAccessorData getClassAccessorData(Class<?> c) { |
| return ACCESSOR_CACHE.get(c); |
| } |
| |
| private FieldAccessor[] getFieldAccessors(Class<?> c, Schema s) { |
| ClassAccessorData data = getClassAccessorData(c); |
| if (data != null) { |
| return data.getAccessorsFor(s); |
| } |
| return null; |
| } |
| |
| private FieldAccessor getFieldAccessor(Class<?> c, String fieldName) { |
| ClassAccessorData data = getClassAccessorData(c); |
| if (data != null) { |
| return data.getAccessorFor(fieldName); |
| } |
| return null; |
| } |
| |
| /** @deprecated Replaced by {@link SpecificData#CLASS_PROP} */ |
| @Deprecated |
| static final String CLASS_PROP = "java-class"; |
| /** @deprecated Replaced by {@link SpecificData#KEY_CLASS_PROP} */ |
| @Deprecated |
| static final String KEY_CLASS_PROP = "java-key-class"; |
| /** @deprecated Replaced by {@link SpecificData#ELEMENT_PROP} */ |
| @Deprecated |
| static final String ELEMENT_PROP = "java-element-class"; |
| |
| private static final Map<String, Class> CLASS_CACHE = new ConcurrentHashMap<>(); |
| |
| static Class getClassProp(Schema schema, String prop) { |
| String name = schema.getProp(prop); |
| if (name == null) |
| return null; |
| Class c = CLASS_CACHE.get(name); |
| if (c != null) |
| return c; |
| try { |
| c = ClassUtils.forName(name); |
| CLASS_CACHE.put(name, c); |
| } catch (ClassNotFoundException e) { |
| throw new AvroRuntimeException(e); |
| } |
| return c; |
| } |
| |
| private static final Class BYTES_CLASS = byte[].class; |
| private static final IdentityHashMap<Class, Class> ARRAY_CLASSES; |
| |
| static { |
| ARRAY_CLASSES = new IdentityHashMap<>(); |
| ARRAY_CLASSES.put(byte.class, byte[].class); |
| ARRAY_CLASSES.put(char.class, char[].class); |
| ARRAY_CLASSES.put(short.class, short[].class); |
| ARRAY_CLASSES.put(int.class, int[].class); |
| ARRAY_CLASSES.put(long.class, long[].class); |
| ARRAY_CLASSES.put(float.class, float[].class); |
| ARRAY_CLASSES.put(double.class, double[].class); |
| ARRAY_CLASSES.put(boolean.class, boolean[].class); |
| } |
| |
| /** |
| * It returns false for non-string-maps because Avro writes out such maps as an |
| * array of records. Even their JSON representation is an array. |
| */ |
| @Override |
| protected boolean isMap(Object datum) { |
| return (datum instanceof Map) && !isNonStringMap(datum); |
| } |
| |
| /* |
| * Without the Field or Schema corresponding to the datum, it is not possible to |
| * accurately find out the non-stringable nature of the key. So we check the |
| * class of the keys. If the map is empty, then it doesn't matter whether its |
| * considered a string-key map or a non-string-key map |
| */ |
| private boolean isNonStringMap(Object datum) { |
| if (datum instanceof Map) { |
| Map m = (Map) datum; |
| if (m.size() > 0) { |
| Class keyClass = m.keySet().iterator().next().getClass(); |
| return !isStringable(keyClass) && !isStringType(keyClass); |
| } |
| } |
| return false; |
| } |
| |
| @Override |
| public Class getClass(Schema schema) { |
| // see if the element class will be converted and use that class |
| Conversion<?> conversion = getConversionFor(schema.getLogicalType()); |
| if (conversion != null) { |
| return conversion.getConvertedType(); |
| } |
| |
| switch (schema.getType()) { |
| case ARRAY: |
| Class collectionClass = getClassProp(schema, CLASS_PROP); |
| if (collectionClass != null) |
| return collectionClass; |
| Class elementClass = getClass(schema.getElementType()); |
| if (elementClass.isPrimitive()) { |
| // avoid expensive translation to array type when primitive |
| return ARRAY_CLASSES.get(elementClass); |
| } else { |
| return java.lang.reflect.Array.newInstance(elementClass, 0).getClass(); |
| } |
| case STRING: |
| Class stringClass = getClassProp(schema, CLASS_PROP); |
| if (stringClass != null) |
| return stringClass; |
| return String.class; |
| case BYTES: |
| return BYTES_CLASS; |
| case INT: |
| String intClass = schema.getProp(CLASS_PROP); |
| if (Byte.class.getName().equals(intClass)) |
| return Byte.TYPE; |
| if (Short.class.getName().equals(intClass)) |
| return Short.TYPE; |
| if (Character.class.getName().equals(intClass)) |
| return Character.TYPE; |
| default: |
| return super.getClass(schema); |
| } |
| } |
| |
| static final String NS_MAP_ARRAY_RECORD = // record name prefix |
| "org.apache.avro.reflect.Pair"; |
| static final String NS_MAP_KEY = "key"; // name of key field |
| static final int NS_MAP_KEY_INDEX = 0; // index of key field |
| static final String NS_MAP_VALUE = "value"; // name of value field |
| static final int NS_MAP_VALUE_INDEX = 1; // index of value field |
| |
| /* |
| * Non-string map-keys need special handling and we convert it to an array of |
| * records as: [{"key":{...}, "value":{...}}] |
| */ |
| Schema createNonStringMapSchema(Type keyType, Type valueType, Map<String, Schema> names) { |
| Schema keySchema = createSchema(keyType, names); |
| Schema valueSchema = createSchema(valueType, names); |
| Schema.Field keyField = new Schema.Field(NS_MAP_KEY, keySchema, null, null); |
| Schema.Field valueField = new Schema.Field(NS_MAP_VALUE, valueSchema, null, null); |
| String name = getNameForNonStringMapRecord(keyType, valueType, keySchema, valueSchema); |
| Schema elementSchema = Schema.createRecord(name, null, null, false); |
| elementSchema.setFields(Arrays.asList(keyField, valueField)); |
| Schema arraySchema = Schema.createArray(elementSchema); |
| return arraySchema; |
| } |
| |
| /* |
| * Gets a unique and consistent name per key-value pair. So if the same |
| * key-value are seen in another map, the same name is generated again. |
| */ |
| private String getNameForNonStringMapRecord(Type keyType, Type valueType, Schema keySchema, Schema valueSchema) { |
| |
| // Generate a nice name for classes in java* package |
| if (keyType instanceof Class && valueType instanceof Class) { |
| |
| Class keyClass = (Class) keyType; |
| Class valueClass = (Class) valueType; |
| Package pkg1 = keyClass.getPackage(); |
| Package pkg2 = valueClass.getPackage(); |
| |
| if (pkg1 != null && pkg1.getName().startsWith("java") && pkg2 != null && pkg2.getName().startsWith("java")) { |
| return NS_MAP_ARRAY_RECORD + keyClass.getSimpleName() + valueClass.getSimpleName(); |
| } |
| } |
| |
| String name = keySchema.getFullName() + valueSchema.getFullName(); |
| long fingerprint = SchemaNormalization.fingerprint64(name.getBytes(StandardCharsets.UTF_8)); |
| |
| if (fingerprint < 0) |
| fingerprint = -fingerprint; // ignore sign |
| String fpString = Long.toString(fingerprint, 16); // hex |
| return NS_MAP_ARRAY_RECORD + fpString; |
| } |
| |
| static boolean isNonStringMapSchema(Schema s) { |
| if (s != null && s.getType() == Schema.Type.ARRAY) { |
| Class c = getClassProp(s, CLASS_PROP); |
| return c != null && Map.class.isAssignableFrom(c); |
| } |
| return false; |
| } |
| |
| /** |
| * Get default value for a schema field. Derived classes can override this |
| * method to provide values based on object instantiation |
| * |
| * @param type Type |
| * @param field Field |
| * @param fieldSchema Schema of the field |
| * @return The default value |
| */ |
| protected Object createSchemaDefaultValue(Type type, Field field, Schema fieldSchema) { |
| Object defaultValue; |
| if (defaultGenerated) { |
| defaultValue = getOrCreateDefaultValue(type, field); |
| if (defaultValue != null) { |
| return deepCopy(fieldSchema, defaultValue); |
| } |
| // if we can't get the default value, try to use previous code below |
| } |
| |
| AvroDefault defaultAnnotation = field.getAnnotation(AvroDefault.class); |
| defaultValue = (defaultAnnotation == null) ? null : Schema.parseJsonToObject(defaultAnnotation.value()); |
| |
| if (defaultValue == null && fieldSchema.getType() == Schema.Type.UNION) { |
| Schema defaultType = fieldSchema.getTypes().get(0); |
| if (defaultType.getType() == Schema.Type.NULL) { |
| defaultValue = JsonProperties.NULL_VALUE; |
| } |
| } |
| return defaultValue; |
| } |
| |
| @Override |
| protected Schema createSchema(Type type, Map<String, Schema> names) { |
| if (type instanceof GenericArrayType) { // generic array |
| Type component = ((GenericArrayType) type).getGenericComponentType(); |
| if (component == Byte.TYPE) // byte array |
| return Schema.create(Schema.Type.BYTES); |
| Schema result = Schema.createArray(createSchema(component, names)); |
| setElement(result, component); |
| return result; |
| } else if (type instanceof ParameterizedType) { |
| ParameterizedType ptype = (ParameterizedType) type; |
| Class raw = (Class) ptype.getRawType(); |
| Type[] params = ptype.getActualTypeArguments(); |
| if (Map.class.isAssignableFrom(raw)) { // Map |
| Class key = (Class) params[0]; |
| if (isStringable(key)) { // Stringable key |
| Schema schema = Schema.createMap(createSchema(params[1], names)); |
| schema.addProp(KEY_CLASS_PROP, key.getName()); |
| return schema; |
| } else if (key != String.class) { |
| Schema schema = createNonStringMapSchema(params[0], params[1], names); |
| schema.addProp(CLASS_PROP, raw.getName()); |
| return schema; |
| } |
| } else if (Collection.class.isAssignableFrom(raw)) { // Collection |
| if (params.length != 1) |
| throw new AvroTypeException("No array type specified."); |
| Schema schema = Schema.createArray(createSchema(params[0], names)); |
| schema.addProp(CLASS_PROP, raw.getName()); |
| return schema; |
| } |
| } else if ((type == Byte.class) || (type == Byte.TYPE)) { |
| Schema result = Schema.create(Schema.Type.INT); |
| result.addProp(CLASS_PROP, Byte.class.getName()); |
| return result; |
| } else if ((type == Short.class) || (type == Short.TYPE)) { |
| Schema result = Schema.create(Schema.Type.INT); |
| result.addProp(CLASS_PROP, Short.class.getName()); |
| return result; |
| } else if ((type == Character.class) || (type == Character.TYPE)) { |
| Schema result = Schema.create(Schema.Type.INT); |
| result.addProp(CLASS_PROP, Character.class.getName()); |
| return result; |
| } else if (type instanceof Class) { // Class |
| Class<?> c = (Class<?>) type; |
| if (c.isPrimitive() || // primitives |
| c == Void.class || c == Boolean.class || c == Integer.class || c == Long.class || c == Float.class |
| || c == Double.class || c == Byte.class || c == Short.class || c == Character.class) |
| return super.createSchema(type, names); |
| if (c.isArray()) { // array |
| Class component = c.getComponentType(); |
| if (component == Byte.TYPE) { // byte array |
| Schema result = Schema.create(Schema.Type.BYTES); |
| result.addProp(CLASS_PROP, c.getName()); |
| return result; |
| } |
| Schema result = Schema.createArray(createSchema(component, names)); |
| result.addProp(CLASS_PROP, c.getName()); |
| setElement(result, component); |
| return result; |
| } |
| AvroSchema explicit = c.getAnnotation(AvroSchema.class); |
| if (explicit != null) // explicit schema |
| return new Schema.Parser().parse(explicit.value()); |
| if (CharSequence.class.isAssignableFrom(c)) // String |
| return Schema.create(Schema.Type.STRING); |
| if (ByteBuffer.class.isAssignableFrom(c)) // bytes |
| return Schema.create(Schema.Type.BYTES); |
| if (Collection.class.isAssignableFrom(c)) // array |
| throw new AvroRuntimeException("Can't find element type of Collection"); |
| Conversion<?> conversion = getConversionByClass(c); |
| if (conversion != null) { |
| return conversion.getRecommendedSchema(); |
| } |
| String fullName = c.getName(); |
| Schema schema = names.get(fullName); |
| if (schema == null) { |
| AvroDoc annotatedDoc = c.getAnnotation(AvroDoc.class); // Docstring |
| String doc = (annotatedDoc != null) ? annotatedDoc.value() : null; |
| String name = c.getSimpleName(); |
| String space = c.getPackage() == null ? "" : c.getPackage().getName(); |
| if (c.getEnclosingClass() != null) // nested class |
| space = c.getEnclosingClass().getName(); |
| Union union = c.getAnnotation(Union.class); |
| if (union != null) { // union annotated |
| return getAnnotatedUnion(union, names); |
| } else if (isStringable(c)) { // Stringable |
| Schema result = Schema.create(Schema.Type.STRING); |
| result.addProp(CLASS_PROP, c.getName()); |
| return result; |
| } else if (c.isEnum()) { // Enum |
| List<String> symbols = new ArrayList<>(); |
| Enum[] constants = (Enum[]) c.getEnumConstants(); |
| for (Enum constant : constants) |
| symbols.add(constant.name()); |
| schema = Schema.createEnum(name, doc, space, symbols); |
| consumeAvroAliasAnnotation(c, schema); |
| } else if (GenericFixed.class.isAssignableFrom(c)) { // fixed |
| int size = c.getAnnotation(FixedSize.class).value(); |
| schema = Schema.createFixed(name, doc, space, size); |
| consumeAvroAliasAnnotation(c, schema); |
| } else if (IndexedRecord.class.isAssignableFrom(c)) { // specific |
| return super.createSchema(type, names); |
| } else { // record |
| List<Schema.Field> fields = new ArrayList<>(); |
| boolean error = Throwable.class.isAssignableFrom(c); |
| schema = Schema.createRecord(name, doc, space, error); |
| consumeAvroAliasAnnotation(c, schema); |
| names.put(c.getName(), schema); |
| for (Field field : getCachedFields(c)) |
| if ((field.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC)) == 0 |
| && !field.isAnnotationPresent(AvroIgnore.class)) { |
| Schema fieldSchema = createFieldSchema(field, names); |
| annotatedDoc = field.getAnnotation(AvroDoc.class); // Docstring |
| doc = (annotatedDoc != null) ? annotatedDoc.value() : null; |
| |
| Object defaultValue = createSchemaDefaultValue(type, field, fieldSchema); |
| |
| AvroName annotatedName = field.getAnnotation(AvroName.class); // Rename fields |
| String fieldName = (annotatedName != null) ? annotatedName.value() : field.getName(); |
| if (STRING_OUTER_PARENT_REFERENCE.equals(fieldName)) { |
| throw new AvroTypeException("Class " + fullName + " must be a static inner class"); |
| } |
| Schema.Field recordField = new Schema.Field(fieldName, fieldSchema, doc, defaultValue); |
| |
| AvroMeta[] metadata = field.getAnnotationsByType(AvroMeta.class); // add metadata |
| for (AvroMeta meta : metadata) { |
| if (recordField.getObjectProps().containsKey(meta.key())) { |
| throw new AvroTypeException("Duplicate field prop key: " + meta.key()); |
| } |
| recordField.addProp(meta.key(), meta.value()); |
| } |
| for (Schema.Field f : fields) { |
| if (f.name().equals(fieldName)) |
| throw new AvroTypeException("double field entry: " + fieldName); |
| } |
| |
| consumeFieldAlias(field, recordField); |
| |
| fields.add(recordField); |
| } |
| if (error) // add Throwable message |
| fields.add(new Schema.Field("detailMessage", THROWABLE_MESSAGE, null, null)); |
| schema.setFields(fields); |
| AvroMeta[] metadata = c.getAnnotationsByType(AvroMeta.class); |
| for (AvroMeta meta : metadata) { |
| if (schema.getObjectProps().containsKey(meta.key())) { |
| throw new AvroTypeException("Duplicate type prop key: " + meta.key()); |
| } |
| schema.addProp(meta.key(), meta.value()); |
| } |
| } |
| names.put(fullName, schema); |
| } |
| return schema; |
| } |
| return super.createSchema(type, names); |
| } |
| |
| @Override |
| protected boolean isStringable(Class<?> c) { |
| return c.isAnnotationPresent(Stringable.class) || super.isStringable(c); |
| } |
| |
| private static final Schema THROWABLE_MESSAGE = makeNullable(Schema.create(Schema.Type.STRING)); |
| |
| // if array element type is a class with a union annotation, note it |
| // this is required because we cannot set a property on the union itself |
| private void setElement(Schema schema, Type element) { |
| if (!(element instanceof Class)) |
| return; |
| Class<?> c = (Class<?>) element; |
| Union union = c.getAnnotation(Union.class); |
| if (union != null) // element is annotated union |
| schema.addProp(ELEMENT_PROP, c.getName()); |
| } |
| |
| // construct a schema from a union annotation |
| private Schema getAnnotatedUnion(Union union, Map<String, Schema> names) { |
| List<Schema> branches = new ArrayList<>(); |
| for (Class branch : union.value()) |
| branches.add(createSchema(branch, names)); |
| return Schema.createUnion(branches); |
| } |
| |
| /** Create and return a union of the null schema and the provided schema. */ |
| public static Schema makeNullable(Schema schema) { |
| if (schema.getType() == Schema.Type.UNION) { |
| // check to see if the union already contains NULL |
| for (Schema subType : schema.getTypes()) { |
| if (subType.getType() == Schema.Type.NULL) { |
| return schema; |
| } |
| } |
| // add null as the first type in a new union |
| List<Schema> withNull = new ArrayList<>(); |
| withNull.add(Schema.create(Schema.Type.NULL)); |
| withNull.addAll(schema.getTypes()); |
| return Schema.createUnion(withNull); |
| } else { |
| // create a union with null |
| return Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL), schema)); |
| } |
| } |
| |
| private static final Map<Class<?>, Field[]> FIELDS_CACHE = new ConcurrentHashMap<>(); |
| |
| // Return of this class and its superclasses to serialize. |
| private static Field[] getCachedFields(Class<?> recordClass) { |
| return FIELDS_CACHE.computeIfAbsent(recordClass, rc -> getFields(rc, true)); |
| } |
| |
| private static Field[] getFields(Class<?> recordClass, boolean excludeJava) { |
| Field[] fieldsList; |
| Map<String, Field> fields = new LinkedHashMap<>(); |
| Class<?> c = recordClass; |
| do { |
| if (excludeJava && c.getPackage() != null && c.getPackage().getName().startsWith("java.")) |
| break; // skip java built-in classes |
| Field[] declaredFields = c.getDeclaredFields(); |
| Arrays.sort(declaredFields, Comparator.comparing(Field::getName)); |
| for (Field field : declaredFields) |
| if ((field.getModifiers() & (Modifier.TRANSIENT | Modifier.STATIC)) == 0) |
| if (fields.put(field.getName(), field) != null) |
| throw new AvroTypeException(c + " contains two fields named: " + field); |
| c = c.getSuperclass(); |
| } while (c != null); |
| fieldsList = fields.values().toArray(new Field[0]); |
| return fieldsList; |
| } |
| |
| /** Create a schema for a field. */ |
| protected Schema createFieldSchema(Field field, Map<String, Schema> names) { |
| AvroEncode enc = field.getAnnotation(AvroEncode.class); |
| if (enc != null) |
| try { |
| return enc.using().getDeclaredConstructor().newInstance().getSchema(); |
| } catch (Exception e) { |
| throw new AvroRuntimeException("Could not create schema from custom serializer for " + field.getName()); |
| } |
| |
| AvroSchema explicit = field.getAnnotation(AvroSchema.class); |
| if (explicit != null) // explicit schema |
| return new Schema.Parser().parse(explicit.value()); |
| |
| Union union = field.getAnnotation(Union.class); |
| if (union != null) |
| return getAnnotatedUnion(union, names); |
| |
| Schema schema = createSchema(field.getGenericType(), names); |
| if (field.isAnnotationPresent(Stringable.class)) { // Stringable |
| schema = Schema.create(Schema.Type.STRING); |
| } |
| if (field.isAnnotationPresent(Nullable.class)) // nullable |
| schema = makeNullable(schema); |
| return schema; |
| } |
| |
| /** |
| * Return the protocol for a Java interface. |
| * |
| * <p> |
| * The correct name of the method parameters needs the <code>-parameters</code> |
| * java compiler argument. More info at https://openjdk.java.net/jeps/118 |
| */ |
| @Override |
| public Protocol getProtocol(Class iface) { |
| Protocol protocol = new Protocol(iface.getSimpleName(), |
| iface.getPackage() == null ? "" : iface.getPackage().getName()); |
| Map<String, Schema> names = new LinkedHashMap<>(); |
| Map<String, Message> messages = protocol.getMessages(); |
| Map<TypeVariable<?>, Type> genericTypeVariableMap = ReflectionUtil.resolveTypeVariables(iface); |
| for (Method method : iface.getMethods()) { |
| if ((method.getModifiers() & Modifier.STATIC) == 0) { |
| String name = method.getName(); |
| if (messages.containsKey(name)) |
| throw new AvroTypeException("Two methods with same name: " + name); |
| messages.put(name, getMessage(method, protocol, names, genericTypeVariableMap)); |
| } |
| } |
| |
| // reverse types, since they were defined in reference order |
| List<Schema> types = new ArrayList<>(names.values()); |
| Collections.reverse(types); |
| protocol.setTypes(types); |
| |
| return protocol; |
| } |
| |
| private Message getMessage(Method method, Protocol protocol, Map<String, Schema> names, |
| Map<? extends Type, Type> genericTypeMap) { |
| List<Schema.Field> fields = new ArrayList<>(); |
| for (Parameter parameter : method.getParameters()) { |
| Schema paramSchema = getSchema(genericTypeMap.getOrDefault(parameter.getParameterizedType(), parameter.getType()), |
| names); |
| for (Annotation annotation : parameter.getAnnotations()) { |
| if (annotation instanceof AvroSchema) // explicit schema |
| paramSchema = new Schema.Parser().parse(((AvroSchema) annotation).value()); |
| else if (annotation instanceof Union) // union |
| paramSchema = getAnnotatedUnion(((Union) annotation), names); |
| else if (annotation instanceof Nullable) // nullable |
| paramSchema = makeNullable(paramSchema); |
| } |
| fields.add(new Schema.Field(parameter.getName(), paramSchema, null /* doc */, null)); |
| } |
| |
| Schema request = Schema.createRecord(fields); |
| |
| Type genericReturnType = method.getGenericReturnType(); |
| Type returnType = genericTypeMap.getOrDefault(genericReturnType, genericReturnType); |
| Union union = method.getAnnotation(Union.class); |
| Schema response = union == null ? getSchema(returnType, names) : getAnnotatedUnion(union, names); |
| if (method.isAnnotationPresent(Nullable.class)) // nullable |
| response = makeNullable(response); |
| |
| AvroSchema explicit = method.getAnnotation(AvroSchema.class); |
| if (explicit != null) // explicit schema |
| response = new Schema.Parser().parse(explicit.value()); |
| |
| List<Schema> errs = new ArrayList<>(); |
| errs.add(Protocol.SYSTEM_ERROR); // every method can throw |
| for (Type err : method.getGenericExceptionTypes()) |
| errs.add(getSchema(err, names)); |
| Schema errors = Schema.createUnion(errs); |
| return protocol.createMessage(method.getName(), null /* doc */, Collections.emptyMap() /* propMap */, request, |
| response, errors); |
| } |
| |
| private Schema getSchema(Type type, Map<String, Schema> names) { |
| try { |
| return createSchema(type, names); |
| } catch (AvroTypeException e) { // friendly exception |
| throw new AvroTypeException("Error getting schema for " + type + ": " + e.getMessage(), e); |
| } |
| } |
| |
| @Override |
| protected int compare(Object o1, Object o2, Schema s, boolean equals) { |
| switch (s.getType()) { |
| case ARRAY: |
| if (!o1.getClass().isArray()) |
| break; |
| Schema elementType = s.getElementType(); |
| int l1 = java.lang.reflect.Array.getLength(o1); |
| int l2 = java.lang.reflect.Array.getLength(o2); |
| int l = Math.min(l1, l2); |
| for (int i = 0; i < l; i++) { |
| int compare = compare(java.lang.reflect.Array.get(o1, i), java.lang.reflect.Array.get(o2, i), elementType, |
| equals); |
| if (compare != 0) |
| return compare; |
| } |
| return Integer.compare(l1, l2); |
| case BYTES: |
| if (!o1.getClass().isArray()) |
| break; |
| byte[] b1 = (byte[]) o1; |
| byte[] b2 = (byte[]) o2; |
| return BinaryData.compareBytes(b1, 0, b1.length, b2, 0, b2.length); |
| } |
| return super.compare(o1, o2, s, equals); |
| } |
| |
| @Override |
| protected Object getRecordState(Object record, Schema schema) { |
| return getFieldAccessors(record.getClass(), schema); |
| } |
| |
| private void consumeAvroAliasAnnotation(Class<?> c, Schema schema) { |
| AvroAlias[] aliases = c.getAnnotationsByType(AvroAlias.class); |
| for (AvroAlias alias : aliases) { |
| String space = alias.space(); |
| if (AvroAlias.NULL.equals(space)) |
| space = null; |
| schema.addAlias(alias.alias(), space); |
| } |
| } |
| |
| private void consumeFieldAlias(Field field, Schema.Field recordField) { |
| AvroAlias[] aliases = field.getAnnotationsByType(AvroAlias.class); |
| for (AvroAlias alias : aliases) { |
| if (!alias.space().equals(AvroAlias.NULL)) { |
| throw new AvroRuntimeException( |
| "Namespaces are not allowed on field aliases. " + "Offending field: " + recordField.name()); |
| } |
| recordField.addAlias(alias.alias()); |
| } |
| } |
| |
| @Override |
| public Object createFixed(Object old, Schema schema) { |
| // SpecificData will try to instantiate the type returned by getClass, but |
| // that is the converted class and can't be constructed. |
| LogicalType logicalType = schema.getLogicalType(); |
| if (logicalType != null) { |
| Conversion<?> conversion = getConversionFor(schema.getLogicalType()); |
| if (conversion != null) { |
| return new GenericData.Fixed(schema); |
| } |
| } |
| return super.createFixed(old, schema); |
| } |
| |
| @Override |
| public Object newRecord(Object old, Schema schema) { |
| // SpecificData will try to instantiate the type returned by getClass, but |
| // that is the converted class and can't be constructed. |
| LogicalType logicalType = schema.getLogicalType(); |
| if (logicalType != null) { |
| Conversion<?> conversion = getConversionFor(schema.getLogicalType()); |
| if (conversion != null) { |
| return new GenericData.Record(schema); |
| } |
| } |
| return super.newRecord(old, schema); |
| } |
| } |