blob: 77d01e98c2c5e50c23f30820d2fa084b7d17365a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.generic;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Collection;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.UnresolvedUnionException;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
/** {@link DatumWriter} for generic Java objects. */
public class GenericDatumWriter<D> implements DatumWriter<D> {
private final GenericData data;
private Schema root;
public GenericDatumWriter() {
this(GenericData.get());
}
protected GenericDatumWriter(GenericData data) {
this.data = data;
}
public GenericDatumWriter(Schema root) {
this();
setSchema(root);
}
public GenericDatumWriter(Schema root, GenericData data) {
this(data);
setSchema(root);
}
/** Return the {@link GenericData} implementation. */
public GenericData getData() {
return data;
}
public void setSchema(Schema root) {
this.root = root;
}
public void write(D datum, Encoder out) throws IOException {
Objects.requireNonNull(out, "Encoder cannot be null");
write(root, datum, out);
}
/** Called to write data. */
protected void write(Schema schema, Object datum, Encoder out) throws IOException {
LogicalType logicalType = schema.getLogicalType();
if (datum != null && logicalType != null) {
Conversion<?> conversion = getData().getConversionByClass(datum.getClass(), logicalType);
writeWithoutConversion(schema, convert(schema, logicalType, conversion, datum), out);
} else {
writeWithoutConversion(schema, datum, out);
}
}
/**
* Convert a high level representation of a logical type (such as a BigDecimal)
* to the its underlying representation object (such as a ByteBuffer).
*
* @throws IllegalArgumentException if a null schema or logicalType is passed in
* while datum and conversion are not null.
* Please be noticed that the exception type
* has changed. With version 1.8.0 and earlier,
* in above circumstance, the exception thrown
* out depends on the implementation of
* conversion (most likely a
* NullPointerException). Now, an
* IllegalArgumentException will be thrown out
* instead.
*/
protected <T> Object convert(Schema schema, LogicalType logicalType, Conversion<T> conversion, Object datum) {
try {
if (conversion == null) {
return datum;
} else {
return Conversions.convertToRawType(datum, schema, logicalType, conversion);
}
} catch (AvroRuntimeException e) {
Throwable cause = e.getCause();
if (cause != null && cause.getClass() == ClassCastException.class) {
// This is to keep backwards compatibility. The convert function here used to
// throw CCE. After being moved to Conversions, it throws AvroRuntimeException
// instead. To keep as much same behaviour as before, this function checks if
// the cause is a CCE. If yes, rethrow it in case any child class checks it.
// This
// behaviour can be changed later in future versions to make it consistent with
// reading path, which throws AvroRuntimeException
throw (ClassCastException) cause;
} else {
throw e;
}
}
}
/** Called to write data. */
protected void writeWithoutConversion(Schema schema, Object datum, Encoder out) throws IOException {
try {
switch (schema.getType()) {
case RECORD:
writeRecord(schema, datum, out);
break;
case ENUM:
writeEnum(schema, datum, out);
break;
case ARRAY:
writeArray(schema, datum, out);
break;
case MAP:
writeMap(schema, datum, out);
break;
case UNION:
int index = resolveUnion(schema, datum);
out.writeIndex(index);
write(schema.getTypes().get(index), datum, out);
break;
case FIXED:
writeFixed(schema, datum, out);
break;
case STRING:
writeString(schema, datum, out);
break;
case BYTES:
writeBytes(datum, out);
break;
case INT:
out.writeInt(((Number) datum).intValue());
break;
case LONG:
out.writeLong(((Number) datum).longValue());
break;
case FLOAT:
out.writeFloat(((Number) datum).floatValue());
break;
case DOUBLE:
out.writeDouble(((Number) datum).doubleValue());
break;
case BOOLEAN:
out.writeBoolean((Boolean) datum);
break;
case NULL:
out.writeNull();
break;
default:
error(schema, datum);
}
} catch (NullPointerException e) {
throw npe(e, " of " + schema.getFullName());
}
}
/** Helper method for adding a message to an NPE . */
protected NullPointerException npe(NullPointerException e, String s) {
NullPointerException result = new NullPointerException(e.getMessage() + s);
result.initCause(e.getCause() == null ? e : e.getCause());
return result;
}
/** Helper method for adding a message to an Class Cast Exception . */
protected ClassCastException addClassCastMsg(ClassCastException e, String s) {
ClassCastException result = new ClassCastException(e.getMessage() + s);
result.initCause(e.getCause() == null ? e : e.getCause());
return result;
}
/** Helper method for adding a message to an Avro Type Exception . */
protected AvroTypeException addAvroTypeMsg(AvroTypeException e, String s) {
AvroTypeException result = new AvroTypeException(e.getMessage() + s);
result.initCause(e.getCause() == null ? e : e.getCause());
return result;
}
/**
* Called to write a record. May be overridden for alternate record
* representations.
*/
protected void writeRecord(Schema schema, Object datum, Encoder out) throws IOException {
Object state = data.getRecordState(datum, schema);
for (Field f : schema.getFields()) {
writeField(datum, f, out, state);
}
}
/**
* Called to write a single field of a record. May be overridden for more
* efficient or alternate implementations.
*/
protected void writeField(Object datum, Field f, Encoder out, Object state) throws IOException {
Object value = data.getField(datum, f.name(), f.pos(), state);
try {
write(f.schema(), value, out);
} catch (final UnresolvedUnionException uue) { // recreate it with the right field info
final UnresolvedUnionException unresolvedUnionException = new UnresolvedUnionException(f.schema(), f, value);
unresolvedUnionException.addSuppressed(uue);
throw unresolvedUnionException;
} catch (NullPointerException e) {
throw npe(e, " in field " + f.name());
} catch (ClassCastException cce) {
throw addClassCastMsg(cce, " in field " + f.name());
} catch (AvroTypeException ate) {
throw addAvroTypeMsg(ate, " in field " + f.name());
}
}
/**
* Called to write an enum value. May be overridden for alternate enum
* representations.
*/
protected void writeEnum(Schema schema, Object datum, Encoder out) throws IOException {
if (!data.isEnum(datum))
throw new AvroTypeException("Not an enum: " + datum + " for schema: " + schema);
out.writeEnum(schema.getEnumOrdinal(datum.toString()));
}
/**
* Called to write a array. May be overridden for alternate array
* representations.
*/
protected void writeArray(Schema schema, Object datum, Encoder out) throws IOException {
Schema element = schema.getElementType();
long size = getArraySize(datum);
long actualSize = 0;
out.writeArrayStart();
out.setItemCount(size);
for (Iterator<? extends Object> it = getArrayElements(datum); it.hasNext();) {
out.startItem();
write(element, it.next(), out);
actualSize++;
}
out.writeArrayEnd();
if (actualSize != size) {
throw new ConcurrentModificationException(
"Size of array written was " + size + ", but number of elements written was " + actualSize + ". ");
}
}
/**
* Called to find the index for a datum within a union. By default calls
* {@link GenericData#resolveUnion(Schema,Object)}.
*/
protected int resolveUnion(Schema union, Object datum) {
return data.resolveUnion(union, datum);
}
/**
* Called by the default implementation of {@link #writeArray} to get the size
* of an array. The default implementation is for {@link Collection}.
*/
@SuppressWarnings("unchecked")
protected long getArraySize(Object array) {
return ((Collection) array).size();
}
/**
* Called by the default implementation of {@link #writeArray} to enumerate
* array elements. The default implementation is for {@link Collection}.
*/
@SuppressWarnings("unchecked")
protected Iterator<? extends Object> getArrayElements(Object array) {
return ((Collection) array).iterator();
}
/**
* Called to write a map. May be overridden for alternate map representations.
*/
protected void writeMap(Schema schema, Object datum, Encoder out) throws IOException {
Schema value = schema.getValueType();
int size = getMapSize(datum);
int actualSize = 0;
out.writeMapStart();
out.setItemCount(size);
for (Map.Entry<Object, Object> entry : getMapEntries(datum)) {
out.startItem();
writeString(entry.getKey().toString(), out);
write(value, entry.getValue(), out);
actualSize++;
}
out.writeMapEnd();
if (actualSize != size) {
throw new ConcurrentModificationException(
"Size of map written was " + size + ", but number of entries written was " + actualSize + ". ");
}
}
/**
* Called by the default implementation of {@link #writeMap} to get the size of
* a map. The default implementation is for {@link Map}.
*/
@SuppressWarnings("unchecked")
protected int getMapSize(Object map) {
return ((Map) map).size();
}
/**
* Called by the default implementation of {@link #writeMap} to enumerate map
* elements. The default implementation is for {@link Map}.
*/
@SuppressWarnings("unchecked")
protected Iterable<Map.Entry<Object, Object>> getMapEntries(Object map) {
return ((Map) map).entrySet();
}
/**
* Called to write a string. May be overridden for alternate string
* representations.
*/
protected void writeString(Schema schema, Object datum, Encoder out) throws IOException {
writeString(datum, out);
}
/**
* Called to write a string. May be overridden for alternate string
* representations.
*/
protected void writeString(Object datum, Encoder out) throws IOException {
out.writeString((CharSequence) datum);
}
/**
* Called to write a bytes. May be overridden for alternate bytes
* representations.
*/
protected void writeBytes(Object datum, Encoder out) throws IOException {
out.writeBytes((ByteBuffer) datum);
}
/**
* Called to write a fixed value. May be overridden for alternate fixed
* representations.
*/
protected void writeFixed(Schema schema, Object datum, Encoder out) throws IOException {
out.writeFixed(((GenericFixed) datum).bytes(), 0, schema.getFixedSize());
}
private void error(Schema schema, Object datum) {
throw new AvroTypeException("Not a " + schema + ": " + datum);
}
}