blob: 16047acba31a81de7fe7a9040d680172f0450e66 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.thrift;
import java.util.List;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.nio.ByteBuffer;
import org.apache.avro.Schema;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.util.ClassUtils;
import org.apache.thrift.TBase;
import org.apache.thrift.TEnum;
import org.apache.thrift.TFieldIdEnum;
import org.apache.thrift.TFieldRequirementType;
import org.apache.thrift.TUnion;
import org.apache.thrift.protocol.TType;
import org.apache.thrift.meta_data.FieldMetaData;
import org.apache.thrift.meta_data.FieldValueMetaData;
import org.apache.thrift.meta_data.EnumMetaData;
import org.apache.thrift.meta_data.ListMetaData;
import org.apache.thrift.meta_data.SetMetaData;
import org.apache.thrift.meta_data.MapMetaData;
import org.apache.thrift.meta_data.StructMetaData;
/** Utilities for serializing Thrift data in Avro format. */
public class ThriftData extends GenericData {
static final String THRIFT_TYPE = "thrift";
static final String THRIFT_PROP = "thrift";
private static final ThriftData INSTANCE = new ThriftData();
protected ThriftData() {}
/** Return the singleton instance. */
public static ThriftData get() { return INSTANCE; }
@Override
public DatumReader createDatumReader(Schema schema) {
return new ThriftDatumReader(schema, schema, this);
}
@Override
public DatumWriter createDatumWriter(Schema schema) {
return new ThriftDatumWriter(schema, this);
}
@Override
public void setField(Object r, String n, int pos, Object o) {
setField(r, n, pos, o, getRecordState(r, getSchema(r.getClass())));
}
@Override
public Object getField(Object r, String name, int pos) {
return getField(r, name, pos, getRecordState(r, getSchema(r.getClass())));
}
@Override
protected void setField(Object r, String n, int pos, Object v, Object state) {
if (v == null && r instanceof TUnion) return;
((TBase)r).setFieldValue(((TFieldIdEnum[])state)[pos], v);
}
@Override
protected Object getField(Object record, String name, int pos, Object state) {
TFieldIdEnum f = ((TFieldIdEnum[])state)[pos];
TBase struct = (TBase)record;
if (struct.isSet(f))
return struct.getFieldValue(f);
return null;
}
private final Map<Schema,TFieldIdEnum[]> fieldCache =
new ConcurrentHashMap<Schema,TFieldIdEnum[]>();
@Override
@SuppressWarnings("unchecked")
protected Object getRecordState(Object r, Schema s) {
TFieldIdEnum[] fields = fieldCache.get(s);
if (fields == null) { // cache miss
fields = new TFieldIdEnum[s.getFields().size()];
Class c = r.getClass();
for (TFieldIdEnum f :
FieldMetaData.getStructMetaDataMap((Class<? extends TBase>) c).keySet())
fields[s.getField(f.getFieldName()).pos()] = f;
fieldCache.put(s, fields); // update cache
}
return fields;
}
@Override
protected String getSchemaName(Object datum) {
// support implicit conversion from thrift's i16
// to avro INT for thrift's optional fields
if (datum instanceof Short)
return Schema.Type.INT.getName();
// support implicit conversion from thrift's byte
// to avro INT for thrift's optional fields
if (datum instanceof Byte)
return Schema.Type.INT.getName();
return super.getSchemaName(datum);
}
@Override
protected boolean isRecord(Object datum) {
return datum instanceof TBase;
}
@Override
protected boolean isEnum(Object datum) {
return datum instanceof TEnum;
}
@Override
protected Schema getEnumSchema(Object datum) {
return getSchema(datum.getClass());
}
@Override
// setFieldValue takes ByteBuffer but getFieldValue returns byte[]
protected boolean isBytes(Object datum) {
if (datum instanceof ByteBuffer) return true;
if (datum == null) return false;
Class c = datum.getClass();
return c.isArray() && c.getComponentType() == Byte.TYPE;
}
@Override
public Object newRecord(Object old, Schema schema) {
try {
Class c = ClassUtils.forName(SpecificData.getClassName(schema));
if (c == null)
return newRecord(old, schema); // punt to generic
if (c.isInstance(old))
return old; // reuse instance
return c.newInstance(); // create new instance
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
protected Schema getRecordSchema(Object record) {
return getSchema(record.getClass());
}
private final Map<Class,Schema> schemaCache
= new ConcurrentHashMap<Class,Schema>();
/** Return a record schema given a thrift generated class. */
@SuppressWarnings("unchecked")
public Schema getSchema(Class c) {
Schema schema = schemaCache.get(c);
if (schema == null) { // cache miss
try {
if (TEnum.class.isAssignableFrom(c)) { // enum
List<String> symbols = new ArrayList<String>();
for (Enum e : ((Class<? extends Enum>)c).getEnumConstants())
symbols.add(e.name());
schema = Schema.createEnum(c.getName(), null, null, symbols);
} else if (TBase.class.isAssignableFrom(c)) { // struct
schema = Schema.createRecord(c.getName(), null, null,
Throwable.class.isAssignableFrom(c));
List<Field> fields = new ArrayList<Field>();
for (FieldMetaData f :
FieldMetaData.getStructMetaDataMap((Class<? extends TBase>) c).values()) {
Schema s = getSchema(f.valueMetaData);
if (f.requirementType == TFieldRequirementType.OPTIONAL
&& (s.getType() != Schema.Type.UNION))
s = nullable(s);
fields.add(new Field(f.fieldName, s, null, null));
}
schema.setFields(fields);
} else {
throw new RuntimeException("Not a Thrift-generated class: "+c);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
schemaCache.put(c, schema); // update cache
}
return schema;
}
private static final Schema NULL = Schema.create(Schema.Type.NULL);
private Schema getSchema(FieldValueMetaData f) {
switch (f.type) {
case TType.BOOL:
return Schema.create(Schema.Type.BOOLEAN);
case TType.BYTE:
Schema b = Schema.create(Schema.Type.INT);
b.addProp(THRIFT_PROP, "byte");
return b;
case TType.I16:
Schema s = Schema.create(Schema.Type.INT);
s.addProp(THRIFT_PROP, "short");
return s;
case TType.I32:
return Schema.create(Schema.Type.INT);
case TType.I64:
return Schema.create(Schema.Type.LONG);
case TType.DOUBLE:
return Schema.create(Schema.Type.DOUBLE);
case TType.ENUM:
EnumMetaData enumMeta = (EnumMetaData)f;
return nullable(getSchema(enumMeta.enumClass));
case TType.LIST:
ListMetaData listMeta = (ListMetaData)f;
return nullable(Schema.createArray(getSchema(listMeta.elemMetaData)));
case TType.MAP:
MapMetaData mapMeta = (MapMetaData)f;
if (mapMeta.keyMetaData.type != TType.STRING)
throw new AvroRuntimeException("Map keys must be strings: "+f);
Schema map = Schema.createMap(getSchema(mapMeta.valueMetaData));
GenericData.setStringType(map, GenericData.StringType.String);
return nullable(map);
case TType.SET:
SetMetaData setMeta = (SetMetaData)f;
Schema set = Schema.createArray(getSchema(setMeta.elemMetaData));
set.addProp(THRIFT_PROP, "set");
return nullable(set);
case TType.STRING:
if (f.isBinary())
return nullable(Schema.create(Schema.Type.BYTES));
Schema string = Schema.create(Schema.Type.STRING);
GenericData.setStringType(string, GenericData.StringType.String);
return nullable(string);
case TType.STRUCT:
StructMetaData structMeta = (StructMetaData)f;
Schema record = getSchema(structMeta.structClass);
return nullable(record);
case TType.VOID:
return NULL;
default:
throw new RuntimeException("Unexpected type in field: "+f);
}
}
private Schema nullable(Schema schema) {
return Schema.createUnion(Arrays.asList(new Schema[] {NULL, schema}));
}
}