blob: 2ada3fb3b5cda8c5e0f98eb20180afbd64715a78 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.reflect;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Type;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.GenericArrayType;
import java.lang.annotation.Annotation;
import java.util.Collection;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.avro.AvroRemoteException;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Protocol;
import org.apache.avro.Schema;
import org.apache.avro.Protocol.Message;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.FixedSize;
import org.apache.avro.io.BinaryData;
import com.thoughtworks.paranamer.CachingParanamer;
import com.thoughtworks.paranamer.Paranamer;
/** Utilities to use existing Java classes and interfaces via reflection. */
public class ReflectData extends SpecificData {
/** {@link ReflectData} implementation that permits null field values. The
* schema generated for each field is a union of its declared type and
* null. */
public static class AllowNull extends ReflectData {
private static final AllowNull INSTANCE = new AllowNull();
/** Return the singleton instance. */
public static AllowNull get() { return INSTANCE; }
protected Schema createFieldSchema(Field field, Map<String, Schema> names) {
Schema schema = super.createFieldSchema(field, names);
return makeNullable(schema);
}
}
private static final ReflectData INSTANCE = new ReflectData();
protected ReflectData() {}
/** Return the singleton instance. */
public static ReflectData get() { return INSTANCE; }
@Override
public void setField(Object record, String name, int position, Object o) {
if (record instanceof IndexedRecord) {
super.setField(record, name, position, o);
return;
}
try {
getField(record.getClass(), name).set(record, o);
} catch (IllegalAccessException e) {
throw new AvroRuntimeException(e);
}
}
@Override
public Object getField(Object record, String name, int position) {
if (record instanceof IndexedRecord)
return super.getField(record, name, position);
try {
return getField(record.getClass(), name).get(record);
} catch (IllegalAccessException e) {
throw new AvroRuntimeException(e);
}
}
@Override
protected boolean isRecord(Object datum) {
if (datum == null) return false;
return getSchema(datum.getClass()).getType() == Schema.Type.RECORD;
}
@Override
protected boolean isArray(Object datum) {
return (datum instanceof Collection) || datum.getClass().isArray();
}
@Override
protected boolean isBytes(Object datum) {
if (datum == null) return false;
if (super.isBytes(datum)) return true;
Class c = datum.getClass();
return c.isArray() && c.getComponentType() == Byte.TYPE;
}
@Override
protected Schema getRecordSchema(Object record) {
return getSchema(record.getClass());
}
@Override
public boolean validate(Schema schema, Object datum) {
switch (schema.getType()) {
case RECORD:
if (datum == null) return false;
Class c = datum.getClass();
for (Schema.Field f : schema.getFields()) {
try {
if (!validate(f.schema(),
getField(c, f.name()).get(datum)))
return false;
} catch (IllegalAccessException e) {
throw new AvroRuntimeException(e);
}
}
return true;
case ARRAY:
if (datum instanceof Collection) { // collection
for (Object element : (Collection)datum)
if (!validate(schema.getElementType(), element))
return false;
return true;
} else if (datum.getClass().isArray()) { // array
int length = java.lang.reflect.Array.getLength(datum);
for (int i = 0; i < length; i++)
if (!validate(schema.getElementType(),
java.lang.reflect.Array.get(datum, i)))
return false;
return true;
}
return false;
default:
return super.validate(schema, datum);
}
}
private static final Map<Class,Map<String,Field>> FIELD_CACHE =
new ConcurrentHashMap<Class,Map<String,Field>>();
/** Return the named field of the provided class. Implementation caches
* values, since this is used at runtime to get and set fields. */
private static Field getField(Class c, String name) {
Map<String,Field> fields = FIELD_CACHE.get(c);
if (fields == null) {
fields = new ConcurrentHashMap<String,Field>();
FIELD_CACHE.put(c, fields);
}
Field f = fields.get(name);
if (f == null) {
f = findField(c, name);
fields.put(name, f);
}
return f;
}
private static Field findField(Class c, String name) {
do {
try {
Field f = c.getDeclaredField(name);
f.setAccessible(true);
return f;
} catch (NoSuchFieldException e) {}
c = c.getSuperclass();
} while (c != null);
throw new AvroRuntimeException("No field named "+name+" in: "+c);
}
static final String CLASS_PROP = "java-class";
static final String ELEMENT_PROP = "java-element-class";
static Class getClassProp(Schema schema, String prop) {
String name = schema.getProp(prop);
if (name == null) return null;
try {
return Class.forName(name);
} catch (ClassNotFoundException e) {
throw new AvroRuntimeException(e);
}
}
private static final Class BYTES_CLASS = new byte[0].getClass();
@Override
public Class getClass(Schema schema) {
switch (schema.getType()) {
case ARRAY:
Class collectionClass = getClassProp(schema, CLASS_PROP);
if (collectionClass != null)
return collectionClass;
return java.lang.reflect.Array.newInstance(getClass(schema.getElementType()),0).getClass();
case STRING: return String.class;
case BYTES: return BYTES_CLASS;
case INT:
if (Short.class.getName().equals(schema.getProp(CLASS_PROP)))
return Short.TYPE;
default:
return super.getClass(schema);
}
}
@Override
@SuppressWarnings(value="unchecked")
protected Schema createSchema(Type type, Map<String,Schema> names) {
if (type instanceof GenericArrayType) { // generic array
Type component = ((GenericArrayType)type).getGenericComponentType();
if (component == Byte.TYPE) // byte array
return Schema.create(Schema.Type.BYTES);
Schema result = Schema.createArray(createSchema(component, names));
setElement(result, component);
return result;
} else if (type instanceof ParameterizedType) {
ParameterizedType ptype = (ParameterizedType)type;
Class raw = (Class)ptype.getRawType();
Type[] params = ptype.getActualTypeArguments();
if (Map.class.isAssignableFrom(raw)) { // Map
Type key = params[0];
Type value = params[1];
if (!(key == String.class))
throw new AvroTypeException("Map key class not String: "+key);
return Schema.createMap(createSchema(value, names));
} else if (Collection.class.isAssignableFrom(raw)) { // Collection
if (params.length != 1)
throw new AvroTypeException("No array type specified.");
Schema schema = Schema.createArray(createSchema(params[0], names));
schema.addProp(CLASS_PROP, raw.getName());
return schema;
}
} else if ((type == Short.class) || (type == Short.TYPE)) {
Schema result = Schema.create(Schema.Type.INT);
result.addProp(CLASS_PROP, Short.class.getName());
return result;
} else if (type instanceof Class) { // Class
Class<?> c = (Class<?>)type;
if (c.isPrimitive() || Number.class.isAssignableFrom(c)
|| c == Void.class || c == Boolean.class) // primitive
return super.createSchema(type, names);
if (c.isArray()) { // array
Class component = c.getComponentType();
if (component == Byte.TYPE) // byte array
return Schema.create(Schema.Type.BYTES);
Schema result = Schema.createArray(createSchema(component, names));
setElement(result, component);
return result;
}
if (CharSequence.class.isAssignableFrom(c)) // String
return Schema.create(Schema.Type.STRING);
String fullName = c.getName();
Schema schema = names.get(fullName);
if (schema == null) {
String name = c.getSimpleName();
String space = c.getPackage() == null ? "" : c.getPackage().getName();
if (c.getEnclosingClass() != null) // nested class
space = c.getEnclosingClass().getName() + "$";
Union union = c.getAnnotation(Union.class);
if (union != null) { // union annotated
return getAnnotatedUnion(union, names);
} else if (c.isAnnotationPresent(Stringable.class)){ // Stringable
Schema result = Schema.create(Schema.Type.STRING);
result.addProp(CLASS_PROP, c.getName());
return result;
} else if (c.isEnum()) { // Enum
List<String> symbols = new ArrayList<String>();
Enum[] constants = (Enum[])c.getEnumConstants();
for (int i = 0; i < constants.length; i++)
symbols.add(constants[i].name());
schema = Schema.createEnum(name, null /* doc */, space, symbols);
} else if (GenericFixed.class.isAssignableFrom(c)) { // fixed
int size = c.getAnnotation(FixedSize.class).value();
schema = Schema.createFixed(name, null /* doc */, space, size);
} else if (IndexedRecord.class.isAssignableFrom(c)) { // specific
return super.createSchema(type, names);
} else { // record
List<Schema.Field> fields = new ArrayList<Schema.Field>();
boolean error = Throwable.class.isAssignableFrom(c);
schema = Schema.createRecord(name, null /* doc */, space, error);
names.put(c.getName(), schema);
for (Field field : getFields(c))
if ((field.getModifiers()&(Modifier.TRANSIENT|Modifier.STATIC))==0){
Schema fieldSchema = createFieldSchema(field, names);
fields.add(new Schema.Field(field.getName(),
fieldSchema, null /* doc */, null));
}
if (error) // add Throwable message
fields.add(new Schema.Field("detailMessage", THROWABLE_MESSAGE,
null, null));
schema.setFields(fields);
}
names.put(fullName, schema);
}
return schema;
}
return super.createSchema(type, names);
}
private static final Schema THROWABLE_MESSAGE =
makeNullable(Schema.create(Schema.Type.STRING));
// if array element type is a class with a union annotation, note it
// this is required because we cannot set a property on the union itself
@SuppressWarnings(value="unchecked")
private void setElement(Schema schema, Type element) {
if (!(element instanceof Class)) return;
Class<?> c = (Class<?>)element;
Union union = c.getAnnotation(Union.class);
if (union != null) // element is annotated union
schema.addProp(ELEMENT_PROP, c.getName());
}
// construct a schema from a union annotation
private Schema getAnnotatedUnion(Union union, Map<String,Schema> names) {
List<Schema> branches = new ArrayList<Schema>();
for (Class branch : union.value())
branches.add(createSchema(branch, names));
return Schema.createUnion(branches);
}
/** Create and return a union of the null schema and the provided schema. */
public static Schema makeNullable(Schema schema) {
return Schema.createUnion(Arrays.asList(Schema.create(Schema.Type.NULL),
schema));
}
// Return of this class and its superclasses to serialize.
// Not cached, since this is only used to create schemas, which are cached.
private Collection<Field> getFields(Class recordClass) {
Map<String,Field> fields = new LinkedHashMap<String,Field>();
Class c = recordClass;
do {
if (c.getPackage() != null
&& c.getPackage().getName().startsWith("java."))
break; // skip java built-in classes
for (Field field : c.getDeclaredFields())
if ((field.getModifiers() & (Modifier.TRANSIENT|Modifier.STATIC)) == 0)
if (fields.put(field.getName(), field) != null)
throw new AvroTypeException(c+" contains two fields named: "+field);
c = c.getSuperclass();
} while (c != null);
return fields.values();
}
/** Create a schema for a field. */
protected Schema createFieldSchema(Field field, Map<String, Schema> names) {
Schema schema = createSchema(field.getGenericType(), names);
if (field.isAnnotationPresent(Nullable.class)) // nullable
schema = makeNullable(schema);
return schema;
}
/** Return the protocol for a Java interface.
* <p>Note that this requires that <a
* href="http://paranamer.codehaus.org/">Paranamer</a> is run over compiled
* interface declarations, since Java 6 reflection does not provide access to
* method parameter names. See Avro's build.xml for an example. */
@Override
public Protocol getProtocol(Class iface) {
Protocol protocol =
new Protocol(iface.getSimpleName(),
iface.getPackage()==null?"":iface.getPackage().getName());
Map<String,Schema> names = new LinkedHashMap<String,Schema>();
Map<String,Message> messages = protocol.getMessages();
for (Method method : iface.getMethods())
if ((method.getModifiers() & Modifier.STATIC) == 0) {
String name = method.getName();
if (messages.containsKey(name))
throw new AvroTypeException("Two methods with same name: "+name);
messages.put(name, getMessage(method, protocol, names));
}
// reverse types, since they were defined in reference order
List<Schema> types = new ArrayList<Schema>();
types.addAll(names.values());
Collections.reverse(types);
protocol.setTypes(types);
return protocol;
}
private final Paranamer paranamer = new CachingParanamer();
private Message getMessage(Method method, Protocol protocol,
Map<String,Schema> names) {
List<Schema.Field> fields = new ArrayList<Schema.Field>();
String[] paramNames = paranamer.lookupParameterNames(method);
Type[] paramTypes = method.getGenericParameterTypes();
Annotation[][] annotations = method.getParameterAnnotations();
for (int i = 0; i < paramTypes.length; i++) {
Schema paramSchema = getSchema(paramTypes[i], names);
for (int j = 0; j < annotations[i].length; j++)
if (annotations[i][j] instanceof Union)
paramSchema = getAnnotatedUnion(((Union)annotations[i][j]), names);
else if (annotations[i][j] instanceof Nullable)
paramSchema = makeNullable(paramSchema);
String paramName = paramNames.length == paramTypes.length
? paramNames[i]
: paramSchema.getName()+i;
fields.add(new Schema.Field(paramName, paramSchema,
null /* doc */, null));
}
Schema request = Schema.createRecord(fields);
Union union = method.getAnnotation(Union.class);
Schema response = union == null
? getSchema(method.getGenericReturnType(), names)
: getAnnotatedUnion(union, names);
if (method.isAnnotationPresent(Nullable.class)) // nullable
response = makeNullable(response);
List<Schema> errs = new ArrayList<Schema>();
errs.add(Protocol.SYSTEM_ERROR); // every method can throw
for (Type err : method.getGenericExceptionTypes())
if (err != AvroRemoteException.class)
errs.add(getSchema(err, names));
Schema errors = Schema.createUnion(errs);
return protocol.createMessage(method.getName(), null /* doc */, request, response, errors);
}
private Schema getSchema(Type type, Map<String,Schema> names) {
try {
return createSchema(type, names);
} catch (AvroTypeException e) { // friendly exception
throw new AvroTypeException("Error getting schema for "+type+": "
+e.getMessage(), e);
}
}
@Override
public int compare(Object o1, Object o2, Schema s) {
switch (s.getType()) {
case ARRAY:
if (!o1.getClass().isArray())
break;
Schema elementType = s.getElementType();
int l1 = java.lang.reflect.Array.getLength(o1);
int l2 = java.lang.reflect.Array.getLength(o2);
int l = Math.min(l1, l2);
for (int i = 0; i < l; i++) {
int compare = compare(java.lang.reflect.Array.get(o1, i),
java.lang.reflect.Array.get(o2, i),
elementType);
if (compare != 0) return compare;
}
return l1 - l2;
case BYTES:
if (!o1.getClass().isArray())
break;
byte[] b1 = (byte[])o1;
byte[] b2 = (byte[])o2;
return BinaryData.compareBytes(b1, 0, b1.length, b2, 0, b2.length);
}
return super.compare(o1, o2, s);
}
}