| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.avro.thrift; |
| |
| import java.util.List; |
| import java.util.Arrays; |
| import java.util.ArrayList; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.nio.ByteBuffer; |
| |
| import org.apache.avro.Schema; |
| import org.apache.avro.AvroRuntimeException; |
| import org.apache.avro.Schema.Field; |
| import org.apache.avro.generic.GenericData; |
| import org.apache.avro.specific.SpecificData; |
| import org.apache.avro.io.DatumReader; |
| import org.apache.avro.io.DatumWriter; |
| |
| import org.apache.avro.util.ClassUtils; |
| import org.apache.thrift.TBase; |
| import org.apache.thrift.TEnum; |
| import org.apache.thrift.TFieldIdEnum; |
| import org.apache.thrift.TFieldRequirementType; |
| import org.apache.thrift.TUnion; |
| import org.apache.thrift.protocol.TType; |
| import org.apache.thrift.meta_data.FieldMetaData; |
| import org.apache.thrift.meta_data.FieldValueMetaData; |
| import org.apache.thrift.meta_data.EnumMetaData; |
| import org.apache.thrift.meta_data.ListMetaData; |
| import org.apache.thrift.meta_data.SetMetaData; |
| import org.apache.thrift.meta_data.MapMetaData; |
| import org.apache.thrift.meta_data.StructMetaData; |
| |
| /** Utilities for serializing Thrift data in Avro format. */ |
| public class ThriftData extends GenericData { |
| static final String THRIFT_TYPE = "thrift"; |
| static final String THRIFT_PROP = "thrift"; |
| |
| private static final ThriftData INSTANCE = new ThriftData(); |
| |
| protected ThriftData() {} |
| |
| /** Return the singleton instance. */ |
| public static ThriftData get() { return INSTANCE; } |
| |
| @Override |
| public DatumReader createDatumReader(Schema schema) { |
| return new ThriftDatumReader(schema, schema, this); |
| } |
| |
| @Override |
| public DatumWriter createDatumWriter(Schema schema) { |
| return new ThriftDatumWriter(schema, this); |
| } |
| |
| @Override |
| public void setField(Object r, String n, int pos, Object o) { |
| setField(r, n, pos, o, getRecordState(r, getSchema(r.getClass()))); |
| } |
| |
| @Override |
| public Object getField(Object r, String name, int pos) { |
| return getField(r, name, pos, getRecordState(r, getSchema(r.getClass()))); |
| } |
| |
| @Override |
| protected void setField(Object r, String n, int pos, Object v, Object state) { |
| if (v == null && r instanceof TUnion) return; |
| ((TBase)r).setFieldValue(((TFieldIdEnum[])state)[pos], v); |
| } |
| |
| @Override |
| protected Object getField(Object record, String name, int pos, Object state) { |
| TFieldIdEnum f = ((TFieldIdEnum[])state)[pos]; |
| TBase struct = (TBase)record; |
| if (struct.isSet(f)) |
| return struct.getFieldValue(f); |
| return null; |
| } |
| |
| private final Map<Schema,TFieldIdEnum[]> fieldCache = |
| new ConcurrentHashMap<Schema,TFieldIdEnum[]>(); |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| protected Object getRecordState(Object r, Schema s) { |
| TFieldIdEnum[] fields = fieldCache.get(s); |
| if (fields == null) { // cache miss |
| fields = new TFieldIdEnum[s.getFields().size()]; |
| Class c = r.getClass(); |
| for (TFieldIdEnum f : |
| FieldMetaData.getStructMetaDataMap((Class<? extends TBase>) c).keySet()) |
| fields[s.getField(f.getFieldName()).pos()] = f; |
| fieldCache.put(s, fields); // update cache |
| } |
| return fields; |
| } |
| |
| @Override |
| protected String getSchemaName(Object datum) { |
| // support implicit conversion from thrift's i16 |
| // to avro INT for thrift's optional fields |
| if (datum instanceof Short) |
| return Schema.Type.INT.getName(); |
| // support implicit conversion from thrift's byte |
| // to avro INT for thrift's optional fields |
| if (datum instanceof Byte) |
| return Schema.Type.INT.getName(); |
| |
| return super.getSchemaName(datum); |
| } |
| |
| @Override |
| protected boolean isRecord(Object datum) { |
| return datum instanceof TBase; |
| } |
| |
| @Override |
| protected boolean isEnum(Object datum) { |
| return datum instanceof TEnum; |
| } |
| |
| @Override |
| protected Schema getEnumSchema(Object datum) { |
| return getSchema(datum.getClass()); |
| } |
| |
| @Override |
| // setFieldValue takes ByteBuffer but getFieldValue returns byte[] |
| protected boolean isBytes(Object datum) { |
| if (datum instanceof ByteBuffer) return true; |
| if (datum == null) return false; |
| Class c = datum.getClass(); |
| return c.isArray() && c.getComponentType() == Byte.TYPE; |
| } |
| |
| @Override |
| public Object newRecord(Object old, Schema schema) { |
| try { |
| Class c = ClassUtils.forName(SpecificData.getClassName(schema)); |
| if (c == null) |
| return newRecord(old, schema); // punt to generic |
| if (c.isInstance(old)) |
| return old; // reuse instance |
| return c.newInstance(); // create new instance |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| @Override |
| protected Schema getRecordSchema(Object record) { |
| return getSchema(record.getClass()); |
| } |
| |
| private final Map<Class,Schema> schemaCache |
| = new ConcurrentHashMap<Class,Schema>(); |
| |
| /** Return a record schema given a thrift generated class. */ |
| @SuppressWarnings("unchecked") |
| public Schema getSchema(Class c) { |
| Schema schema = schemaCache.get(c); |
| |
| if (schema == null) { // cache miss |
| try { |
| if (TEnum.class.isAssignableFrom(c)) { // enum |
| List<String> symbols = new ArrayList<String>(); |
| for (Enum e : ((Class<? extends Enum>)c).getEnumConstants()) |
| symbols.add(e.name()); |
| schema = Schema.createEnum(c.getName(), null, null, symbols); |
| } else if (TBase.class.isAssignableFrom(c)) { // struct |
| schema = Schema.createRecord(c.getName(), null, null, |
| Throwable.class.isAssignableFrom(c)); |
| List<Field> fields = new ArrayList<Field>(); |
| for (FieldMetaData f : |
| FieldMetaData.getStructMetaDataMap((Class<? extends TBase>) c).values()) { |
| Schema s = getSchema(f.valueMetaData); |
| if (f.requirementType == TFieldRequirementType.OPTIONAL |
| && (s.getType() != Schema.Type.UNION)) |
| s = nullable(s); |
| fields.add(new Field(f.fieldName, s, null, null)); |
| } |
| schema.setFields(fields); |
| } else { |
| throw new RuntimeException("Not a Thrift-generated class: "+c); |
| } |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| schemaCache.put(c, schema); // update cache |
| } |
| return schema; |
| } |
| |
| private static final Schema NULL = Schema.create(Schema.Type.NULL); |
| |
| private Schema getSchema(FieldValueMetaData f) { |
| switch (f.type) { |
| case TType.BOOL: |
| return Schema.create(Schema.Type.BOOLEAN); |
| case TType.BYTE: |
| Schema b = Schema.create(Schema.Type.INT); |
| b.addProp(THRIFT_PROP, "byte"); |
| return b; |
| case TType.I16: |
| Schema s = Schema.create(Schema.Type.INT); |
| s.addProp(THRIFT_PROP, "short"); |
| return s; |
| case TType.I32: |
| return Schema.create(Schema.Type.INT); |
| case TType.I64: |
| return Schema.create(Schema.Type.LONG); |
| case TType.DOUBLE: |
| return Schema.create(Schema.Type.DOUBLE); |
| case TType.ENUM: |
| EnumMetaData enumMeta = (EnumMetaData)f; |
| return nullable(getSchema(enumMeta.enumClass)); |
| case TType.LIST: |
| ListMetaData listMeta = (ListMetaData)f; |
| return nullable(Schema.createArray(getSchema(listMeta.elemMetaData))); |
| case TType.MAP: |
| MapMetaData mapMeta = (MapMetaData)f; |
| if (mapMeta.keyMetaData.type != TType.STRING) |
| throw new AvroRuntimeException("Map keys must be strings: "+f); |
| Schema map = Schema.createMap(getSchema(mapMeta.valueMetaData)); |
| GenericData.setStringType(map, GenericData.StringType.String); |
| return nullable(map); |
| case TType.SET: |
| SetMetaData setMeta = (SetMetaData)f; |
| Schema set = Schema.createArray(getSchema(setMeta.elemMetaData)); |
| set.addProp(THRIFT_PROP, "set"); |
| return nullable(set); |
| case TType.STRING: |
| if (f.isBinary()) |
| return nullable(Schema.create(Schema.Type.BYTES)); |
| Schema string = Schema.create(Schema.Type.STRING); |
| GenericData.setStringType(string, GenericData.StringType.String); |
| return nullable(string); |
| case TType.STRUCT: |
| StructMetaData structMeta = (StructMetaData)f; |
| Schema record = getSchema(structMeta.structClass); |
| return nullable(record); |
| case TType.VOID: |
| return NULL; |
| default: |
| throw new RuntimeException("Unexpected type in field: "+f); |
| } |
| } |
| |
| private Schema nullable(Schema schema) { |
| return Schema.createUnion(Arrays.asList(new Schema[] {NULL, schema})); |
| } |
| |
| } |