blob: 60fca8e8a65b149cba2c8f5bc83303ac370df2c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.generic;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.AbstractList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import org.apache.avro.AvroMissingFieldException;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.AvroTypeException;
import org.apache.avro.Conversion;
import org.apache.avro.Conversions;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.UnresolvedUnionException;
import org.apache.avro.io.BinaryData;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.FastReaderBuilder;
import org.apache.avro.util.Utf8;
import org.apache.avro.util.internal.Accessor;
import com.fasterxml.jackson.databind.JsonNode;
/**
* Utilities for generic Java data. See {@link GenericRecordBuilder} for a
* convenient way to build {@link GenericRecord} instances.
*
* @see GenericRecordBuilder
*/
public class GenericData {
private static final GenericData INSTANCE = new GenericData();
/** Used to specify the Java type for a string schema. */
public enum StringType {
CharSequence, String, Utf8
};
public static final String STRING_PROP = "avro.java.string";
protected static final String STRING_TYPE_STRING = "String";
private final ClassLoader classLoader;
/**
* Set the Java type to be used when reading this schema. Meaningful only only
* string schemas and map schemas (for the keys).
*/
public static void setStringType(Schema s, StringType stringType) {
// Utf8 is the default and implements CharSequence, so we only need to add
// a property when the type is String
if (stringType == StringType.String)
s.addProp(GenericData.STRING_PROP, GenericData.STRING_TYPE_STRING);
}
/** Return the singleton instance. */
public static GenericData get() {
return INSTANCE;
}
/** For subclasses. Applications normally use {@link GenericData#get()}. */
public GenericData() {
this(null);
}
/** For subclasses. GenericData does not use a ClassLoader. */
public GenericData(ClassLoader classLoader) {
this.classLoader = (classLoader != null) ? classLoader : getClass().getClassLoader();
}
/** Return the class loader that's used (by subclasses). */
public ClassLoader getClassLoader() {
return classLoader;
}
private Map<String, Conversion<?>> conversions = new HashMap<>();
private Map<Class<?>, Map<String, Conversion<?>>> conversionsByClass = new IdentityHashMap<>();
public Collection<Conversion<?>> getConversions() {
return conversions.values();
}
/**
* Registers the given conversion to be used when reading and writing with this
* data model.
*
* @param conversion a logical type Conversion.
*/
public void addLogicalTypeConversion(Conversion<?> conversion) {
conversions.put(conversion.getLogicalTypeName(), conversion);
Class<?> type = conversion.getConvertedType();
if (conversionsByClass.containsKey(type)) {
conversionsByClass.get(type).put(conversion.getLogicalTypeName(), conversion);
} else {
Map<String, Conversion<?>> conversions = new LinkedHashMap<>();
conversions.put(conversion.getLogicalTypeName(), conversion);
conversionsByClass.put(type, conversions);
}
}
/**
* Returns the first conversion found for the given class.
*
* @param datumClass a Class
* @return the first registered conversion for the class, or null
*/
@SuppressWarnings("unchecked")
public <T> Conversion<T> getConversionByClass(Class<T> datumClass) {
Map<String, Conversion<?>> conversions = conversionsByClass.get(datumClass);
if (conversions != null) {
return (Conversion<T>) conversions.values().iterator().next();
}
return null;
}
/**
* Returns the conversion for the given class and logical type.
*
* @param datumClass a Class
* @param logicalType a LogicalType
* @return the conversion for the class and logical type, or null
*/
@SuppressWarnings("unchecked")
public <T> Conversion<T> getConversionByClass(Class<T> datumClass, LogicalType logicalType) {
Map<String, Conversion<?>> conversions = conversionsByClass.get(datumClass);
if (conversions != null) {
return (Conversion<T>) conversions.get(logicalType.getName());
}
return null;
}
/**
* Returns the Conversion for the given logical type.
*
* @param logicalType a logical type
* @return the conversion for the logical type, or null
*/
@SuppressWarnings("unchecked")
public Conversion<Object> getConversionFor(LogicalType logicalType) {
if (logicalType == null) {
return null;
}
return (Conversion<Object>) conversions.get(logicalType.getName());
}
public static final String FAST_READER_PROP = "org.apache.avro.fastread";
private boolean fastReaderEnabled = "true".equalsIgnoreCase(System.getProperty(FAST_READER_PROP));
private FastReaderBuilder fastReaderBuilder = null;
public GenericData setFastReaderEnabled(boolean flag) {
this.fastReaderEnabled = flag;
return this;
}
public boolean isFastReaderEnabled() {
return fastReaderEnabled && FastReaderBuilder.isSupportedData(this);
}
public FastReaderBuilder getFastReaderBuilder() {
if (fastReaderBuilder == null) {
fastReaderBuilder = new FastReaderBuilder(this);
}
return this.fastReaderBuilder;
}
/**
* Default implementation of {@link GenericRecord}. Note that this
* implementation does not fill in default values for fields if they are not
* specified; use {@link GenericRecordBuilder} in that case.
*
* @see GenericRecordBuilder
*/
public static class Record implements GenericRecord, Comparable<Record> {
private final Schema schema;
private final Object[] values;
public Record(Schema schema) {
if (schema == null || !Type.RECORD.equals(schema.getType()))
throw new AvroRuntimeException("Not a record schema: " + schema);
this.schema = schema;
this.values = new Object[schema.getFields().size()];
}
public Record(Record other, boolean deepCopy) {
schema = other.schema;
values = new Object[schema.getFields().size()];
if (deepCopy) {
for (int ii = 0; ii < values.length; ii++) {
values[ii] = INSTANCE.deepCopy(schema.getFields().get(ii).schema(), other.values[ii]);
}
} else {
System.arraycopy(other.values, 0, values, 0, other.values.length);
}
}
@Override
public Schema getSchema() {
return schema;
}
@Override
public void put(String key, Object value) {
Schema.Field field = schema.getField(key);
if (field == null) {
throw new AvroRuntimeException("Not a valid schema field: " + key);
}
values[field.pos()] = value;
}
@Override
public void put(int i, Object v) {
values[i] = v;
}
@Override
public Object get(String key) {
Field field = schema.getField(key);
if (field == null) {
throw new AvroRuntimeException("Not a valid schema field: " + key);
}
return values[field.pos()];
}
@Override
public Object get(int i) {
return values[i];
}
@Override
public boolean equals(Object o) {
if (o == this)
return true; // identical object
if (!(o instanceof Record))
return false; // not a record
Record that = (Record) o;
if (!this.schema.equals(that.schema))
return false; // not the same schema
return GenericData.get().compare(this, that, schema, true) == 0;
}
@Override
public int hashCode() {
return GenericData.get().hashCode(this, schema);
}
@Override
public int compareTo(Record that) {
return GenericData.get().compare(this, that, schema);
}
@Override
public String toString() {
return GenericData.get().toString(this);
}
}
/** Default implementation of an array. */
@SuppressWarnings(value = "unchecked")
public static class Array<T> extends AbstractList<T> implements GenericArray<T>, Comparable<GenericArray<T>> {
private static final Object[] EMPTY = new Object[0];
private final Schema schema;
private int size;
private Object[] elements = EMPTY;
public Array(int capacity, Schema schema) {
if (schema == null || !Type.ARRAY.equals(schema.getType()))
throw new AvroRuntimeException("Not an array schema: " + schema);
this.schema = schema;
if (capacity != 0)
elements = new Object[capacity];
}
public Array(Schema schema, Collection<T> c) {
if (schema == null || !Type.ARRAY.equals(schema.getType()))
throw new AvroRuntimeException("Not an array schema: " + schema);
this.schema = schema;
if (c != null) {
elements = new Object[c.size()];
addAll(c);
}
}
@Override
public Schema getSchema() {
return schema;
}
@Override
public int size() {
return size;
}
@Override
public void clear() {
// Let GC do its work
Arrays.fill(elements, 0, size, null);
size = 0;
}
@Override
public void reset() {
size = 0;
}
@Override
public void prune() {
if (size < elements.length) {
Arrays.fill(elements, size, elements.length, null);
}
}
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
private int position = 0;
@Override
public boolean hasNext() {
return position < size;
}
@Override
public T next() {
return (T) elements[position++];
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
@Override
public T get(int i) {
if (i >= size)
throw new IndexOutOfBoundsException("Index " + i + " out of bounds.");
return (T) elements[i];
}
@Override
public void add(int location, T o) {
if (location > size || location < 0) {
throw new IndexOutOfBoundsException("Index " + location + " out of bounds.");
}
if (size == elements.length) {
// Increase size by 1.5x + 1
final int newSize = size + (size >> 1) + 1;
elements = Arrays.copyOf(elements, newSize);
}
System.arraycopy(elements, location, elements, location + 1, size - location);
elements[location] = o;
size++;
}
@Override
public T set(int i, T o) {
if (i >= size)
throw new IndexOutOfBoundsException("Index " + i + " out of bounds.");
T response = (T) elements[i];
elements[i] = o;
return response;
}
@Override
public T remove(int i) {
if (i >= size)
throw new IndexOutOfBoundsException("Index " + i + " out of bounds.");
T result = (T) elements[i];
--size;
System.arraycopy(elements, i + 1, elements, i, (size - i));
elements[size] = null;
return result;
}
@Override
public T peek() {
return (size < elements.length) ? (T) elements[size] : null;
}
@Override
public int compareTo(GenericArray<T> that) {
return GenericData.get().compare(this, that, this.getSchema());
}
@Override
public void reverse() {
int left = 0;
int right = elements.length - 1;
while (left < right) {
Object tmp = elements[left];
elements[left] = elements[right];
elements[right] = tmp;
left++;
right--;
}
}
}
/** Default implementation of {@link GenericFixed}. */
public static class Fixed implements GenericFixed, Comparable<Fixed> {
private Schema schema;
private byte[] bytes;
public Fixed(Schema schema) {
setSchema(schema);
}
public Fixed(Schema schema, byte[] bytes) {
this.schema = schema;
this.bytes = bytes;
}
protected Fixed() {
}
protected void setSchema(Schema schema) {
this.schema = schema;
this.bytes = new byte[schema.getFixedSize()];
}
@Override
public Schema getSchema() {
return schema;
}
public void bytes(byte[] bytes) {
this.bytes = bytes;
}
@Override
public byte[] bytes() {
return bytes;
}
@Override
public boolean equals(Object o) {
if (o == this)
return true;
return o instanceof GenericFixed && Arrays.equals(bytes, ((GenericFixed) o).bytes());
}
@Override
public int hashCode() {
return Arrays.hashCode(bytes);
}
@Override
public String toString() {
return Arrays.toString(bytes);
}
@Override
public int compareTo(Fixed that) {
return BinaryData.compareBytes(this.bytes, 0, this.bytes.length, that.bytes, 0, that.bytes.length);
}
}
/** Default implementation of {@link GenericEnumSymbol}. */
public static class EnumSymbol implements GenericEnumSymbol<EnumSymbol> {
private Schema schema;
private String symbol;
public EnumSymbol(Schema schema, String symbol) {
this.schema = schema;
this.symbol = symbol;
}
/**
* Maps existing Objects into an Avro enum by calling toString(), eg for Java
* Enums
*/
public EnumSymbol(Schema schema, Object symbol) {
this(schema, symbol.toString());
}
@Override
public Schema getSchema() {
return schema;
}
@Override
public boolean equals(Object o) {
if (o == this)
return true;
return o instanceof GenericEnumSymbol && symbol.equals(o.toString());
}
@Override
public int hashCode() {
return symbol.hashCode();
}
@Override
public String toString() {
return symbol;
}
@Override
public int compareTo(EnumSymbol that) {
return GenericData.get().compare(this, that, schema);
}
}
/** Returns a {@link DatumReader} for this kind of data. */
public DatumReader createDatumReader(Schema schema) {
return createDatumReader(schema, schema);
}
/** Returns a {@link DatumReader} for this kind of data. */
public DatumReader createDatumReader(Schema writer, Schema reader) {
return new GenericDatumReader(writer, reader, this);
}
/** Returns a {@link DatumWriter} for this kind of data. */
public DatumWriter createDatumWriter(Schema schema) {
return new GenericDatumWriter(schema, this);
}
/** Returns true if a Java datum matches a schema. */
public boolean validate(Schema schema, Object datum) {
switch (schema.getType()) {
case RECORD:
if (!isRecord(datum))
return false;
for (Field f : schema.getFields()) {
if (!validate(f.schema(), getField(datum, f.name(), f.pos())))
return false;
}
return true;
case ENUM:
if (!isEnum(datum))
return false;
return schema.getEnumSymbols().contains(datum.toString());
case ARRAY:
if (!(isArray(datum)))
return false;
for (Object element : getArrayAsCollection(datum))
if (!validate(schema.getElementType(), element))
return false;
return true;
case MAP:
if (!(isMap(datum)))
return false;
@SuppressWarnings(value = "unchecked")
Map<Object, Object> map = (Map<Object, Object>) datum;
for (Map.Entry<Object, Object> entry : map.entrySet())
if (!validate(schema.getValueType(), entry.getValue()))
return false;
return true;
case UNION:
try {
int i = resolveUnion(schema, datum);
return validate(schema.getTypes().get(i), datum);
} catch (UnresolvedUnionException e) {
return false;
}
case FIXED:
return datum instanceof GenericFixed && ((GenericFixed) datum).bytes().length == schema.getFixedSize();
case STRING:
return isString(datum);
case BYTES:
return isBytes(datum);
case INT:
return isInteger(datum);
case LONG:
return isLong(datum);
case FLOAT:
return isFloat(datum);
case DOUBLE:
return isDouble(datum);
case BOOLEAN:
return isBoolean(datum);
case NULL:
return datum == null;
default:
return false;
}
}
/** Renders a Java datum as <a href="https://www.json.org/">JSON</a>. */
public String toString(Object datum) {
StringBuilder buffer = new StringBuilder();
toString(datum, buffer, new IdentityHashMap<>(128));
return buffer.toString();
}
private static final String TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT = " \">>> CIRCULAR REFERENCE CANNOT BE PUT IN JSON STRING, ABORTING RECURSION <<<\" ";
/** Renders a Java datum as <a href="https://www.json.org/">JSON</a>. */
protected void toString(Object datum, StringBuilder buffer, IdentityHashMap<Object, Object> seenObjects) {
if (isRecord(datum)) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
buffer.append("{");
int count = 0;
Schema schema = getRecordSchema(datum);
for (Field f : schema.getFields()) {
toString(f.name(), buffer, seenObjects);
buffer.append(": ");
toString(getField(datum, f.name(), f.pos()), buffer, seenObjects);
if (++count < schema.getFields().size())
buffer.append(", ");
}
buffer.append("}");
seenObjects.remove(datum);
} else if (isArray(datum)) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
Collection<?> array = getArrayAsCollection(datum);
buffer.append("[");
long last = array.size() - 1;
int i = 0;
for (Object element : array) {
toString(element, buffer, seenObjects);
if (i++ < last)
buffer.append(", ");
}
buffer.append("]");
seenObjects.remove(datum);
} else if (isMap(datum)) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
buffer.append("{");
int count = 0;
@SuppressWarnings(value = "unchecked")
Map<Object, Object> map = (Map<Object, Object>) datum;
for (Map.Entry<Object, Object> entry : map.entrySet()) {
buffer.append("\"");
writeEscapedString(String.valueOf(entry.getKey()), buffer);
buffer.append("\": ");
toString(entry.getValue(), buffer, seenObjects);
if (++count < map.size())
buffer.append(", ");
}
buffer.append("}");
seenObjects.remove(datum);
} else if (isString(datum) || isEnum(datum)) {
buffer.append("\"");
writeEscapedString(datum.toString(), buffer);
buffer.append("\"");
} else if (isBytes(datum)) {
buffer.append("\"");
ByteBuffer bytes = ((ByteBuffer) datum).duplicate();
writeEscapedString(StandardCharsets.ISO_8859_1.decode(bytes), buffer);
buffer.append("\"");
} else if (((datum instanceof Float) && // quote Nan & Infinity
(((Float) datum).isInfinite() || ((Float) datum).isNaN()))
|| ((datum instanceof Double) && (((Double) datum).isInfinite() || ((Double) datum).isNaN()))) {
buffer.append("\"");
buffer.append(datum);
buffer.append("\"");
} else if (datum instanceof GenericData) {
if (seenObjects.containsKey(datum)) {
buffer.append(TOSTRING_CIRCULAR_REFERENCE_ERROR_TEXT);
return;
}
seenObjects.put(datum, datum);
toString(datum, buffer, seenObjects);
seenObjects.remove(datum);
} else {
buffer.append(datum);
}
}
/* Adapted from https://code.google.com/p/json-simple */
private static void writeEscapedString(CharSequence string, StringBuilder builder) {
for (int i = 0; i < string.length(); i++) {
char ch = string.charAt(i);
switch (ch) {
case '"':
builder.append("\\\"");
break;
case '\\':
builder.append("\\\\");
break;
case '\b':
builder.append("\\b");
break;
case '\f':
builder.append("\\f");
break;
case '\n':
builder.append("\\n");
break;
case '\r':
builder.append("\\r");
break;
case '\t':
builder.append("\\t");
break;
default:
// Reference: https://www.unicode.org/versions/Unicode5.1.0/
if ((ch >= '\u0000' && ch <= '\u001F') || (ch >= '\u007F' && ch <= '\u009F')
|| (ch >= '\u2000' && ch <= '\u20FF')) {
String hex = Integer.toHexString(ch);
builder.append("\\u");
for (int j = 0; j < 4 - hex.length(); j++)
builder.append('0');
builder.append(hex.toUpperCase());
} else {
builder.append(ch);
}
}
}
}
/** Create a schema given an example datum. */
public Schema induce(Object datum) {
if (isRecord(datum)) {
return getRecordSchema(datum);
} else if (isArray(datum)) {
Schema elementType = null;
for (Object element : getArrayAsCollection(datum)) {
if (elementType == null) {
elementType = induce(element);
} else if (!elementType.equals(induce(element))) {
throw new AvroTypeException("No mixed type arrays.");
}
}
if (elementType == null) {
throw new AvroTypeException("Empty array: " + datum);
}
return Schema.createArray(elementType);
} else if (isMap(datum)) {
@SuppressWarnings(value = "unchecked")
Map<Object, Object> map = (Map<Object, Object>) datum;
Schema value = null;
for (Map.Entry<Object, Object> entry : map.entrySet()) {
if (value == null) {
value = induce(entry.getValue());
} else if (!value.equals(induce(entry.getValue()))) {
throw new AvroTypeException("No mixed type map values.");
}
}
if (value == null) {
throw new AvroTypeException("Empty map: " + datum);
}
return Schema.createMap(value);
} else if (datum instanceof GenericFixed) {
return Schema.createFixed(null, null, null, ((GenericFixed) datum).bytes().length);
} else if (isString(datum))
return Schema.create(Type.STRING);
else if (isBytes(datum))
return Schema.create(Type.BYTES);
else if (isInteger(datum))
return Schema.create(Type.INT);
else if (isLong(datum))
return Schema.create(Type.LONG);
else if (isFloat(datum))
return Schema.create(Type.FLOAT);
else if (isDouble(datum))
return Schema.create(Type.DOUBLE);
else if (isBoolean(datum))
return Schema.create(Type.BOOLEAN);
else if (datum == null)
return Schema.create(Type.NULL);
else
throw new AvroTypeException("Can't create schema for: " + datum);
}
/**
* Called by {@link GenericDatumReader#readRecord} to set a record fields value
* to a record instance. The default implementation is for
* {@link IndexedRecord}.
*/
public void setField(Object record, String name, int position, Object value) {
((IndexedRecord) record).put(position, value);
}
/**
* Called by {@link GenericDatumReader#readRecord} to retrieve a record field
* value from a reused instance. The default implementation is for
* {@link IndexedRecord}.
*/
public Object getField(Object record, String name, int position) {
return ((IndexedRecord) record).get(position);
}
/**
* Produce state for repeated calls to
* {@link #getField(Object,String,int,Object)} and
* {@link #setField(Object,String,int,Object,Object)} on the same record.
*/
protected Object getRecordState(Object record, Schema schema) {
return null;
}
/** Version of {@link #setField} that has state. */
protected void setField(Object record, String name, int position, Object value, Object state) {
setField(record, name, position, value);
}
/** Version of {@link #getField} that has state. */
protected Object getField(Object record, String name, int pos, Object state) {
return getField(record, name, pos);
}
/**
* Return the index for a datum within a union. Implemented with
* {@link Schema#getIndexNamed(String)} and {@link #getSchemaName(Object)}.
*/
public int resolveUnion(Schema union, Object datum) {
// if there is a logical type that works, use it first
// this allows logical type concrete classes to overlap with supported ones
// for example, a conversion could return a map
if (datum != null) {
Map<String, Conversion<?>> conversions = conversionsByClass.get(datum.getClass());
if (conversions != null) {
List<Schema> candidates = union.getTypes();
for (int i = 0; i < candidates.size(); i += 1) {
LogicalType candidateType = candidates.get(i).getLogicalType();
if (candidateType != null) {
Conversion<?> conversion = conversions.get(candidateType.getName());
if (conversion != null) {
return i;
}
}
}
}
}
Integer i = union.getIndexNamed(getSchemaName(datum));
if (i != null) {
return i;
}
throw new UnresolvedUnionException(union, datum);
}
/**
* Return the schema full name for a datum. Called by
* {@link #resolveUnion(Schema,Object)}.
*/
protected String getSchemaName(Object datum) {
if (datum == null || datum == JsonProperties.NULL_VALUE)
return Type.NULL.getName();
if (isRecord(datum))
return getRecordSchema(datum).getFullName();
if (isEnum(datum))
return getEnumSchema(datum).getFullName();
if (isArray(datum))
return Type.ARRAY.getName();
if (isMap(datum))
return Type.MAP.getName();
if (isFixed(datum))
return getFixedSchema(datum).getFullName();
if (isString(datum))
return Type.STRING.getName();
if (isBytes(datum))
return Type.BYTES.getName();
if (isInteger(datum))
return Type.INT.getName();
if (isLong(datum))
return Type.LONG.getName();
if (isFloat(datum))
return Type.FLOAT.getName();
if (isDouble(datum))
return Type.DOUBLE.getName();
if (isBoolean(datum))
return Type.BOOLEAN.getName();
throw new AvroRuntimeException(String.format("Unknown datum type %s: %s", datum.getClass().getName(), datum));
}
/**
* Called by {@link #resolveUnion(Schema,Object)}. May be overridden for
* alternate data representations.
*/
protected boolean instanceOf(Schema schema, Object datum) {
switch (schema.getType()) {
case RECORD:
if (!isRecord(datum))
return false;
return (schema.getFullName() == null) ? getRecordSchema(datum).getFullName() == null
: schema.getFullName().equals(getRecordSchema(datum).getFullName());
case ENUM:
if (!isEnum(datum))
return false;
return schema.getFullName().equals(getEnumSchema(datum).getFullName());
case ARRAY:
return isArray(datum);
case MAP:
return isMap(datum);
case FIXED:
if (!isFixed(datum))
return false;
return schema.getFullName().equals(getFixedSchema(datum).getFullName());
case STRING:
return isString(datum);
case BYTES:
return isBytes(datum);
case INT:
return isInteger(datum);
case LONG:
return isLong(datum);
case FLOAT:
return isFloat(datum);
case DOUBLE:
return isDouble(datum);
case BOOLEAN:
return isBoolean(datum);
case NULL:
return datum == null;
default:
throw new AvroRuntimeException("Unexpected type: " + schema);
}
}
/** Called by the default implementation of {@link #instanceOf}. */
protected boolean isArray(Object datum) {
return datum instanceof Collection;
}
/** Called to access an array as a collection. */
protected Collection getArrayAsCollection(Object datum) {
return (Collection) datum;
}
/** Called by the default implementation of {@link #instanceOf}. */
protected boolean isRecord(Object datum) {
return datum instanceof IndexedRecord;
}
/**
* Called to obtain the schema of a record. By default calls
* {GenericContainer#getSchema(). May be overridden for alternate record
* representations.
*/
protected Schema getRecordSchema(Object record) {
return ((GenericContainer) record).getSchema();
}
/** Called by the default implementation of {@link #instanceOf}. */
protected boolean isEnum(Object datum) {
return datum instanceof GenericEnumSymbol;
}
/**
* Called to obtain the schema of a enum. By default calls
* {GenericContainer#getSchema(). May be overridden for alternate enum
* representations.
*/
protected Schema getEnumSchema(Object enu) {
return ((GenericContainer) enu).getSchema();
}
/** Called by the default implementation of {@link #instanceOf}. */
protected boolean isMap(Object datum) {
return datum instanceof Map;
}
/** Called by the default implementation of {@link #instanceOf}. */
protected boolean isFixed(Object datum) {
return datum instanceof GenericFixed;
}
/**
* Called to obtain the schema of a fixed. By default calls
* {GenericContainer#getSchema(). May be overridden for alternate fixed
* representations.
*/
protected Schema getFixedSchema(Object fixed) {
return ((GenericContainer) fixed).getSchema();
}
/** Called by the default implementation of {@link #instanceOf}. */
protected boolean isString(Object datum) {
return datum instanceof CharSequence;
}
/** Called by the default implementation of {@link #instanceOf}. */
protected boolean isBytes(Object datum) {
return datum instanceof ByteBuffer;
}
/**
* Called by the default implementation of {@link #instanceOf}.
*/
protected boolean isInteger(Object datum) {
return datum instanceof Integer;
}
/**
* Called by the default implementation of {@link #instanceOf}.
*/
protected boolean isLong(Object datum) {
return datum instanceof Long;
}
/**
* Called by the default implementation of {@link #instanceOf}.
*/
protected boolean isFloat(Object datum) {
return datum instanceof Float;
}
/**
* Called by the default implementation of {@link #instanceOf}.
*/
protected boolean isDouble(Object datum) {
return datum instanceof Double;
}
/**
* Called by the default implementation of {@link #instanceOf}.
*/
protected boolean isBoolean(Object datum) {
return datum instanceof Boolean;
}
/**
* Compute a hash code according to a schema, consistent with
* {@link #compare(Object,Object,Schema)}.
*/
public int hashCode(Object o, Schema s) {
if (o == null)
return 0; // incomplete datum
int hashCode = 1;
switch (s.getType()) {
case RECORD:
for (Field f : s.getFields()) {
if (f.order() == Field.Order.IGNORE)
continue;
hashCode = hashCodeAdd(hashCode, getField(o, f.name(), f.pos()), f.schema());
}
return hashCode;
case ARRAY:
Collection<?> a = (Collection<?>) o;
Schema elementType = s.getElementType();
for (Object e : a)
hashCode = hashCodeAdd(hashCode, e, elementType);
return hashCode;
case UNION:
return hashCode(o, s.getTypes().get(resolveUnion(s, o)));
case ENUM:
return s.getEnumOrdinal(o.toString());
case NULL:
return 0;
case STRING:
return (o instanceof Utf8 ? o : new Utf8(o.toString())).hashCode();
default:
return o.hashCode();
}
}
/** Add the hash code for an object into an accumulated hash code. */
protected int hashCodeAdd(int hashCode, Object o, Schema s) {
return 31 * hashCode + hashCode(o, s);
}
/**
* Compare objects according to their schema. If equal, return zero. If
* greater-than, return 1, if less than return -1. Order is consistent with that
* of {@link BinaryData#compare(byte[], int, byte[], int, Schema)}.
*/
public int compare(Object o1, Object o2, Schema s) {
return compare(o1, o2, s, false);
}
/**
* Comparison implementation. When equals is true, only checks for equality, not
* for order.
*/
@SuppressWarnings(value = "unchecked")
protected int compare(Object o1, Object o2, Schema s, boolean equals) {
if (o1 == o2)
return 0;
switch (s.getType()) {
case RECORD:
for (Field f : s.getFields()) {
if (f.order() == Field.Order.IGNORE)
continue; // ignore this field
int pos = f.pos();
String name = f.name();
int compare = compare(getField(o1, name, pos), getField(o2, name, pos), f.schema(), equals);
if (compare != 0) // not equal
return f.order() == Field.Order.DESCENDING ? -compare : compare;
}
return 0;
case ENUM:
return s.getEnumOrdinal(o1.toString()) - s.getEnumOrdinal(o2.toString());
case ARRAY:
Collection a1 = (Collection) o1;
Collection a2 = (Collection) o2;
Iterator e1 = a1.iterator();
Iterator e2 = a2.iterator();
Schema elementType = s.getElementType();
while (e1.hasNext() && e2.hasNext()) {
int compare = compare(e1.next(), e2.next(), elementType, equals);
if (compare != 0)
return compare;
}
return e1.hasNext() ? 1 : (e2.hasNext() ? -1 : 0);
case MAP:
if (equals)
return o1.equals(o2) ? 0 : 1;
throw new AvroRuntimeException("Can't compare maps!");
case UNION:
int i1 = resolveUnion(s, o1);
int i2 = resolveUnion(s, o2);
return (i1 == i2) ? compare(o1, o2, s.getTypes().get(i1), equals) : Integer.compare(i1, i2);
case NULL:
return 0;
case STRING:
Utf8 u1 = o1 instanceof Utf8 ? (Utf8) o1 : new Utf8(o1.toString());
Utf8 u2 = o2 instanceof Utf8 ? (Utf8) o2 : new Utf8(o2.toString());
return u1.compareTo(u2);
default:
return ((Comparable) o1).compareTo(o2);
}
}
private final Map<Field, Object> defaultValueCache = Collections.synchronizedMap(new WeakHashMap<>());
/**
* Gets the default value of the given field, if any.
*
* @param field the field whose default value should be retrieved.
* @return the default value associated with the given field, or null if none is
* specified in the schema.
*/
@SuppressWarnings({ "unchecked" })
public Object getDefaultValue(Field field) {
JsonNode json = Accessor.defaultValue(field);
if (json == null)
throw new AvroMissingFieldException("Field " + field + " not set and has no default value", field);
if (json.isNull() && (field.schema().getType() == Type.NULL
|| (field.schema().getType() == Type.UNION && field.schema().getTypes().get(0).getType() == Type.NULL))) {
return null;
}
// Check the cache
Object defaultValue = defaultValueCache.get(field);
// If not cached, get the default Java value by encoding the default JSON
// value and then decoding it:
if (defaultValue == null)
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
Accessor.encode(encoder, field.schema(), json);
encoder.flush();
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(baos.toByteArray(), null);
defaultValue = createDatumReader(field.schema()).read(null, decoder);
// this MAY result in two threads creating the same defaultValue
// and calling put. The last thread will win. However,
// that's not an issue.
defaultValueCache.put(field, defaultValue);
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
return defaultValue;
}
private static final Schema STRINGS = Schema.create(Type.STRING);
/**
* Makes a deep copy of a value given its schema.
* <P>
* Logical types are converted to raw types, copied, then converted back.
*
* @param schema the schema of the value to deep copy.
* @param value the value to deep copy.
* @return a deep copy of the given value.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T> T deepCopy(Schema schema, T value) {
if (value == null)
return null;
LogicalType logicalType = schema.getLogicalType();
if (logicalType == null) // not a logical type -- use raw copy
return (T) deepCopyRaw(schema, value);
Conversion conversion = getConversionByClass(value.getClass(), logicalType);
if (conversion == null) // no conversion defined -- try raw copy
return (T) deepCopyRaw(schema, value);
// logical type with conversion: convert to raw, copy, then convert back to
// logical
Object raw = Conversions.convertToRawType(value, schema, logicalType, conversion);
Object copy = deepCopyRaw(schema, raw); // copy raw
return (T) Conversions.convertToLogicalType(copy, schema, logicalType, conversion);
}
private Object deepCopyRaw(Schema schema, Object value) {
if (value == null) {
return null;
}
switch (schema.getType()) {
case ARRAY:
List<Object> arrayValue = (List) value;
List<Object> arrayCopy = new GenericData.Array<>(arrayValue.size(), schema);
for (Object obj : arrayValue) {
arrayCopy.add(deepCopy(schema.getElementType(), obj));
}
return arrayCopy;
case BOOLEAN:
return value; // immutable
case BYTES:
ByteBuffer byteBufferValue = (ByteBuffer) value;
int start = byteBufferValue.position();
int length = byteBufferValue.limit() - start;
byte[] bytesCopy = new byte[length];
byteBufferValue.get(bytesCopy, 0, length);
((Buffer) byteBufferValue).position(start);
return ByteBuffer.wrap(bytesCopy, 0, length);
case DOUBLE:
return value; // immutable
case ENUM:
return createEnum(value.toString(), schema);
case FIXED:
return createFixed(null, ((GenericFixed) value).bytes(), schema);
case FLOAT:
return value; // immutable
case INT:
return value; // immutable
case LONG:
return value; // immutable
case MAP:
Map<CharSequence, Object> mapValue = (Map) value;
Map<CharSequence, Object> mapCopy = new HashMap<>(mapValue.size());
for (Map.Entry<CharSequence, Object> entry : mapValue.entrySet()) {
mapCopy.put(deepCopy(STRINGS, entry.getKey()), deepCopy(schema.getValueType(), entry.getValue()));
}
return mapCopy;
case NULL:
return null;
case RECORD:
Object oldState = getRecordState(value, schema);
Object newRecord = newRecord(null, schema);
Object newState = getRecordState(newRecord, schema);
for (Field f : schema.getFields()) {
int pos = f.pos();
String name = f.name();
Object newValue = deepCopy(f.schema(), getField(value, name, pos, oldState));
setField(newRecord, name, pos, newValue, newState);
}
return newRecord;
case STRING:
// Strings are immutable
if (value instanceof String) {
return value;
}
// Some CharSequence subclasses are mutable, so we still need to make
// a copy
else if (value instanceof Utf8) {
// Utf8 copy constructor is more efficient than converting
// to string and then back to Utf8
return new Utf8((Utf8) value);
}
return new Utf8(value.toString());
case UNION:
return deepCopy(schema.getTypes().get(resolveUnion(schema, value)), value);
default:
throw new AvroRuntimeException("Deep copy failed for schema \"" + schema + "\" and value \"" + value + "\"");
}
}
/**
* Called to create an fixed value. May be overridden for alternate fixed
* representations. By default, returns {@link GenericFixed}.
*/
public Object createFixed(Object old, Schema schema) {
if ((old instanceof GenericFixed) && ((GenericFixed) old).bytes().length == schema.getFixedSize())
return old;
return new GenericData.Fixed(schema);
}
/**
* Called to create an fixed value. May be overridden for alternate fixed
* representations. By default, returns {@link GenericFixed}.
*/
public Object createFixed(Object old, byte[] bytes, Schema schema) {
GenericFixed fixed = (GenericFixed) createFixed(old, schema);
System.arraycopy(bytes, 0, fixed.bytes(), 0, schema.getFixedSize());
return fixed;
}
/**
* Called to create an enum value. May be overridden for alternate enum
* representations. By default, returns a GenericEnumSymbol.
*/
public Object createEnum(String symbol, Schema schema) {
return new EnumSymbol(schema, symbol);
}
/**
* Called to create new record instances. Subclasses may override to use a
* different record implementation. The returned instance must conform to the
* schema provided. If the old object contains fields not present in the schema,
* they should either be removed from the old object, or it should create a new
* instance that conforms to the schema. By default, this returns a
* {@link GenericData.Record}.
*/
public Object newRecord(Object old, Schema schema) {
if (old instanceof IndexedRecord) {
IndexedRecord record = (IndexedRecord) old;
if (record.getSchema() == schema)
return record;
}
return new GenericData.Record(schema);
}
/**
* Called to create new array instances. Subclasses may override to use a
* different array implementation. By default, this returns a
* {@link GenericData.Array}.
*/
public Object newArray(Object old, int size, Schema schema) {
if (old instanceof GenericArray) {
((GenericArray<?>) old).reset();
return old;
} else if (old instanceof Collection) {
((Collection<?>) old).clear();
return old;
} else
return new GenericData.Array<Object>(size, schema);
}
/**
* Called to create new array instances. Subclasses may override to use a
* different map implementation. By default, this returns a {@link HashMap}.
*/
public Object newMap(Object old, int size) {
if (old instanceof Map) {
((Map<?, ?>) old).clear();
return old;
} else
return new HashMap<>(size);
}
/**
* create a supplier that allows to get new record instances for a given schema
* in an optimized way
*/
public InstanceSupplier getNewRecordSupplier(Schema schema) {
return this::newRecord;
}
public interface InstanceSupplier {
public Object newInstance(Object oldInstance, Schema schema);
}
}