| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.avro.generic; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ConcurrentModificationException; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Collection; |
| |
| import org.apache.avro.AvroTypeException; |
| import org.apache.avro.Conversion; |
| import org.apache.avro.LogicalType; |
| import org.apache.avro.Schema; |
| import org.apache.avro.Schema.Field; |
| import org.apache.avro.io.DatumWriter; |
| import org.apache.avro.io.Encoder; |
| |
| /** {@link DatumWriter} for generic Java objects. */ |
| public class GenericDatumWriter<D> implements DatumWriter<D> { |
| private final GenericData data; |
| private Schema root; |
| |
| public GenericDatumWriter() { this(GenericData.get()); } |
| |
| protected GenericDatumWriter(GenericData data) { this.data = data; } |
| |
| public GenericDatumWriter(Schema root) { |
| this(); |
| setSchema(root); |
| } |
| |
| public GenericDatumWriter(Schema root, GenericData data) { |
| this(data); |
| setSchema(root); |
| } |
| |
| /** Return the {@link GenericData} implementation. */ |
| public GenericData getData() { return data; } |
| |
| public void setSchema(Schema root) { this.root = root; } |
| |
| public void write(D datum, Encoder out) throws IOException { |
| write(root, datum, out); |
| } |
| |
| /** Called to write data.*/ |
| protected void write(Schema schema, Object datum, Encoder out) |
| throws IOException { |
| LogicalType logicalType = schema.getLogicalType(); |
| if (datum != null && logicalType != null) { |
| Conversion<?> conversion = getData() |
| .getConversionByClass(datum.getClass(), logicalType); |
| writeWithoutConversion(schema, |
| convert(schema, logicalType, conversion, datum), out); |
| } else { |
| writeWithoutConversion(schema, datum, out); |
| } |
| } |
| |
| protected <T> Object convert(Schema schema, LogicalType logicalType, |
| Conversion<T> conversion, Object datum) { |
| if (conversion == null) { |
| return datum; |
| } |
| Class<T> fromClass = conversion.getConvertedType(); |
| switch (schema.getType()) { |
| case RECORD: return conversion.toRecord(fromClass.cast(datum), schema, logicalType); |
| case ENUM: return conversion.toEnumSymbol(fromClass.cast(datum), schema, logicalType); |
| case ARRAY: return conversion.toArray(fromClass.cast(datum), schema, logicalType); |
| case MAP: return conversion.toMap(fromClass.cast(datum), schema, logicalType); |
| case FIXED: return conversion.toFixed(fromClass.cast(datum), schema, logicalType); |
| case STRING: return conversion.toCharSequence(fromClass.cast(datum), schema, logicalType); |
| case BYTES: return conversion.toBytes(fromClass.cast(datum), schema, logicalType); |
| case INT: return conversion.toInt(fromClass.cast(datum), schema, logicalType); |
| case LONG: return conversion.toLong(fromClass.cast(datum), schema, logicalType); |
| case FLOAT: return conversion.toFloat(fromClass.cast(datum), schema, logicalType); |
| case DOUBLE: return conversion.toDouble(fromClass.cast(datum), schema, logicalType); |
| case BOOLEAN: return conversion.toBoolean(fromClass.cast(datum), schema, logicalType); |
| } |
| return datum; |
| } |
| |
| /** Called to write data.*/ |
| protected void writeWithoutConversion(Schema schema, Object datum, Encoder out) |
| throws IOException { |
| try { |
| switch (schema.getType()) { |
| case RECORD: writeRecord(schema, datum, out); break; |
| case ENUM: writeEnum(schema, datum, out); break; |
| case ARRAY: writeArray(schema, datum, out); break; |
| case MAP: writeMap(schema, datum, out); break; |
| case UNION: |
| int index = resolveUnion(schema, datum); |
| out.writeIndex(index); |
| write(schema.getTypes().get(index), datum, out); |
| break; |
| case FIXED: writeFixed(schema, datum, out); break; |
| case STRING: writeString(schema, datum, out); break; |
| case BYTES: writeBytes(datum, out); break; |
| case INT: out.writeInt(((Number)datum).intValue()); break; |
| case LONG: out.writeLong((Long)datum); break; |
| case FLOAT: out.writeFloat((Float)datum); break; |
| case DOUBLE: out.writeDouble((Double)datum); break; |
| case BOOLEAN: out.writeBoolean((Boolean)datum); break; |
| case NULL: out.writeNull(); break; |
| default: error(schema,datum); |
| } |
| } catch (NullPointerException e) { |
| throw npe(e, " of "+schema.getFullName()); |
| } |
| } |
| |
| /** Helper method for adding a message to an NPE. */ |
| protected NullPointerException npe(NullPointerException e, String s) { |
| NullPointerException result = new NullPointerException(e.getMessage()+s); |
| result.initCause(e.getCause() == null ? e : e.getCause()); |
| return result; |
| } |
| |
| /** Called to write a record. May be overridden for alternate record |
| * representations.*/ |
| protected void writeRecord(Schema schema, Object datum, Encoder out) |
| throws IOException { |
| Object state = data.getRecordState(datum, schema); |
| for (Field f : schema.getFields()) { |
| writeField(datum, f, out, state); |
| } |
| } |
| |
| /** Called to write a single field of a record. May be overridden for more |
| * efficient or alternate implementations.*/ |
| protected void writeField(Object datum, Field f, Encoder out, Object state) |
| throws IOException { |
| Object value = data.getField(datum, f.name(), f.pos(), state); |
| try { |
| write(f.schema(), value, out); |
| } catch (NullPointerException e) { |
| throw npe(e, " in field " + f.name()); |
| } |
| } |
| |
| /** Called to write an enum value. May be overridden for alternate enum |
| * representations.*/ |
| protected void writeEnum(Schema schema, Object datum, Encoder out) |
| throws IOException { |
| if (!data.isEnum(datum)) |
| throw new AvroTypeException("Not an enum: "+datum); |
| out.writeEnum(schema.getEnumOrdinal(datum.toString())); |
| } |
| |
| /** Called to write a array. May be overridden for alternate array |
| * representations.*/ |
| protected void writeArray(Schema schema, Object datum, Encoder out) |
| throws IOException { |
| Schema element = schema.getElementType(); |
| long size = getArraySize(datum); |
| long actualSize = 0; |
| out.writeArrayStart(); |
| out.setItemCount(size); |
| for (Iterator<? extends Object> it = getArrayElements(datum); it.hasNext();) { |
| out.startItem(); |
| write(element, it.next(), out); |
| actualSize++; |
| } |
| out.writeArrayEnd(); |
| if (actualSize != size) { |
| throw new ConcurrentModificationException("Size of array written was " + |
| size + ", but number of elements written was " + actualSize + ". "); |
| } |
| } |
| |
| /** Called to find the index for a datum within a union. By default calls |
| * {@link GenericData#resolveUnion(Schema,Object)}.*/ |
| protected int resolveUnion(Schema union, Object datum) { |
| return data.resolveUnion(union, datum); |
| } |
| |
| /** Called by the default implementation of {@link #writeArray} to get the |
| * size of an array. The default implementation is for {@link Collection}.*/ |
| @SuppressWarnings("unchecked") |
| protected long getArraySize(Object array) { |
| return ((Collection) array).size(); |
| } |
| |
| /** Called by the default implementation of {@link #writeArray} to enumerate |
| * array elements. The default implementation is for {@link Collection}.*/ |
| @SuppressWarnings("unchecked") |
| protected Iterator<? extends Object> getArrayElements(Object array) { |
| return ((Collection) array).iterator(); |
| } |
| |
| /** Called to write a map. May be overridden for alternate map |
| * representations.*/ |
| protected void writeMap(Schema schema, Object datum, Encoder out) |
| throws IOException { |
| Schema value = schema.getValueType(); |
| int size = getMapSize(datum); |
| int actualSize = 0; |
| out.writeMapStart(); |
| out.setItemCount(size); |
| for (Map.Entry<Object,Object> entry : getMapEntries(datum)) { |
| out.startItem(); |
| writeString(entry.getKey().toString(), out); |
| write(value, entry.getValue(), out); |
| actualSize++; |
| } |
| out.writeMapEnd(); |
| if (actualSize != size) { |
| throw new ConcurrentModificationException("Size of map written was " + |
| size + ", but number of entries written was " + actualSize + ". "); |
| } |
| } |
| |
| /** Called by the default implementation of {@link #writeMap} to get the size |
| * of a map. The default implementation is for {@link Map}.*/ |
| @SuppressWarnings("unchecked") |
| protected int getMapSize(Object map) { |
| return ((Map) map).size(); |
| } |
| |
| /** Called by the default implementation of {@link #writeMap} to enumerate |
| * map elements. The default implementation is for {@link Map}.*/ |
| @SuppressWarnings("unchecked") |
| protected Iterable<Map.Entry<Object,Object>> getMapEntries(Object map) { |
| return ((Map) map).entrySet(); |
| } |
| |
| /** Called to write a string. May be overridden for alternate string |
| * representations.*/ |
| protected void writeString(Schema schema, Object datum, Encoder out) |
| throws IOException { |
| writeString(datum, out); |
| } |
| /** Called to write a string. May be overridden for alternate string |
| * representations.*/ |
| protected void writeString(Object datum, Encoder out) throws IOException { |
| out.writeString((CharSequence) datum); |
| } |
| |
| /** Called to write a bytes. May be overridden for alternate bytes |
| * representations.*/ |
| protected void writeBytes(Object datum, Encoder out) throws IOException { |
| out.writeBytes((ByteBuffer)datum); |
| } |
| |
| /** Called to write a fixed value. May be overridden for alternate fixed |
| * representations.*/ |
| protected void writeFixed(Schema schema, Object datum, Encoder out) |
| throws IOException { |
| out.writeFixed(((GenericFixed)datum).bytes(), 0, schema.getFixedSize()); |
| } |
| |
| private void error(Schema schema, Object datum) { |
| throw new AvroTypeException("Not a "+schema+": "+datum); |
| } |
| |
| } |
| |