blob: ff603cd439e3afdb1ee6753df63965a950685e5c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.NullNode;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.avro.util.internal.Accessor;
import org.apache.avro.util.internal.Accessor.FieldAccessor;
import org.apache.avro.util.internal.JacksonUtils;
/**
* An abstract data type.
* <p>
* A schema may be one of:
* <ul>
* <li>A <i>record</i>, mapping field names to field value data;
* <li>An <i>enum</i>, containing one of a small set of symbols;
* <li>An <i>array</i> of values, all of the same schema;
* <li>A <i>map</i>, containing string/value pairs, of a declared schema;
* <li>A <i>union</i> of other schemas;
* <li>A <i>fixed</i> sized binary object;
* <li>A unicode <i>string</i>;
* <li>A sequence of <i>bytes</i>;
* <li>A 32-bit signed <i>int</i>;
* <li>A 64-bit signed <i>long</i>;
* <li>A 32-bit IEEE single-<i>float</i>; or
* <li>A 64-bit IEEE <i>double</i>-float; or
* <li>A <i>boolean</i>; or
* <li><i>null</i>.
* </ul>
*
* A schema can be constructed using one of its static <tt>createXXX</tt>
* methods, or more conveniently using {@link SchemaBuilder}. The schema objects
* are <i>logically</i> immutable. There are only two mutating methods -
* {@link #setFields(List)} and {@link #addProp(String, String)}. The following
* restrictions apply on these two methods.
* <ul>
* <li>{@link #setFields(List)}, can be called at most once. This method exists
* in order to enable clients to build recursive schemas.
* <li>{@link #addProp(String, String)} can be called with property names that
* are not present already. It is not possible to change or delete an existing
* property.
* </ul>
*/
public abstract class Schema extends JsonProperties implements Serializable {
private static final long serialVersionUID = 1L;
protected Object writeReplace() {
SerializableSchema ss = new SerializableSchema();
ss.schemaString = toString();
return ss;
}
private static final class SerializableSchema implements Serializable {
private static final long serialVersionUID = 1L;
private String schemaString;
private Object readResolve() {
return new Schema.Parser().parse(schemaString);
}
}
static final JsonFactory FACTORY = new JsonFactory();
static final ObjectMapper MAPPER = new ObjectMapper(FACTORY);
private static final int NO_HASHCODE = Integer.MIN_VALUE;
static {
FACTORY.enable(JsonParser.Feature.ALLOW_COMMENTS);
FACTORY.setCodec(MAPPER);
}
/** The type of a schema. */
public enum Type {
RECORD, ENUM, ARRAY, MAP, UNION, FIXED, STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL;
private final String name;
private Type() {
this.name = this.name().toLowerCase(Locale.ENGLISH);
}
public String getName() {
return name;
}
};
private final Type type;
private LogicalType logicalType = null;
Schema(Type type) {
super(type == Type.ENUM ? ENUM_RESERVED : SCHEMA_RESERVED);
this.type = type;
}
/** Create a schema for a primitive type. */
public static Schema create(Type type) {
switch (type) {
case STRING:
return new StringSchema();
case BYTES:
return new BytesSchema();
case INT:
return new IntSchema();
case LONG:
return new LongSchema();
case FLOAT:
return new FloatSchema();
case DOUBLE:
return new DoubleSchema();
case BOOLEAN:
return new BooleanSchema();
case NULL:
return new NullSchema();
default:
throw new AvroRuntimeException("Can't create a: " + type);
}
}
private static final Set<String> SCHEMA_RESERVED = new HashSet<>(
Arrays.asList("doc", "fields", "items", "name", "namespace", "size", "symbols", "values", "type", "aliases"));
private static final Set<String> ENUM_RESERVED = new HashSet<>(SCHEMA_RESERVED);
static {
ENUM_RESERVED.add("default");
}
int hashCode = NO_HASHCODE;
@Override
public void addProp(String name, String value) {
super.addProp(name, value);
hashCode = NO_HASHCODE;
}
@Override
public void addProp(String name, Object value) {
super.addProp(name, value);
hashCode = NO_HASHCODE;
}
public LogicalType getLogicalType() {
return logicalType;
}
void setLogicalType(LogicalType logicalType) {
this.logicalType = logicalType;
}
/**
* Create an anonymous record schema.
*
* @deprecated This method allows to create Schema objects that cannot be parsed
* by {@link Schema.Parser#parse(String)}. It will be removed in a
* future version of Avro. Better use
* i{@link #createRecord(String, String, String, boolean, List)} to
* produce a fully qualified Schema.
*/
@Deprecated
public static Schema createRecord(List<Field> fields) {
Schema result = createRecord(null, null, null, false);
result.setFields(fields);
return result;
}
/** Create a named record schema. */
public static Schema createRecord(String name, String doc, String namespace, boolean isError) {
return new RecordSchema(new Name(name, namespace), doc, isError);
}
/** Create a named record schema with fields already set. */
public static Schema createRecord(String name, String doc, String namespace, boolean isError, List<Field> fields) {
return new RecordSchema(new Name(name, namespace), doc, isError, fields);
}
/** Create an enum schema. */
public static Schema createEnum(String name, String doc, String namespace, List<String> values) {
return new EnumSchema(new Name(name, namespace), doc, new LockableArrayList<>(values), null);
}
/** Create an enum schema. */
public static Schema createEnum(String name, String doc, String namespace, List<String> values, String enumDefault) {
return new EnumSchema(new Name(name, namespace), doc, new LockableArrayList<>(values), enumDefault);
}
/** Create an array schema. */
public static Schema createArray(Schema elementType) {
return new ArraySchema(elementType);
}
/** Create a map schema. */
public static Schema createMap(Schema valueType) {
return new MapSchema(valueType);
}
/** Create a union schema. */
public static Schema createUnion(List<Schema> types) {
return new UnionSchema(new LockableArrayList<>(types));
}
/** Create a union schema. */
public static Schema createUnion(Schema... types) {
return createUnion(new LockableArrayList<>(types));
}
/** Create a fixed schema. */
public static Schema createFixed(String name, String doc, String space, int size) {
return new FixedSchema(new Name(name, space), doc, size);
}
/** Return the type of this schema. */
public Type getType() {
return type;
}
/**
* If this is a record, returns the Field with the given name
* <tt>fieldName</tt>. If there is no field by that name, a <tt>null</tt> is
* returned.
*/
public Field getField(String fieldname) {
throw new AvroRuntimeException("Not a record: " + this);
}
/**
* If this is a record, returns the fields in it. The returned list is in the
* order of their positions.
*/
public List<Field> getFields() {
throw new AvroRuntimeException("Not a record: " + this);
}
/**
* If this is a record, set its fields. The fields can be set only once in a
* schema.
*/
public void setFields(List<Field> fields) {
throw new AvroRuntimeException("Not a record: " + this);
}
/** If this is an enum, return its symbols. */
public List<String> getEnumSymbols() {
throw new AvroRuntimeException("Not an enum: " + this);
}
/** If this is an enum, return its default value. */
public String getEnumDefault() {
throw new AvroRuntimeException("Not an enum: " + this);
}
/** If this is an enum, return a symbol's ordinal value. */
public int getEnumOrdinal(String symbol) {
throw new AvroRuntimeException("Not an enum: " + this);
}
/** If this is an enum, returns true if it contains given symbol. */
public boolean hasEnumSymbol(String symbol) {
throw new AvroRuntimeException("Not an enum: " + this);
}
/**
* If this is a record, enum or fixed, returns its name, otherwise the name of
* the primitive type.
*/
public String getName() {
return type.name;
}
/**
* If this is a record, enum, or fixed, returns its docstring, if available.
* Otherwise, returns null.
*/
public String getDoc() {
return null;
}
/** If this is a record, enum or fixed, returns its namespace, if any. */
public String getNamespace() {
throw new AvroRuntimeException("Not a named type: " + this);
}
/**
* If this is a record, enum or fixed, returns its namespace-qualified name,
* otherwise returns the name of the primitive type.
*/
public String getFullName() {
return getName();
}
/** If this is a record, enum or fixed, add an alias. */
public void addAlias(String alias) {
throw new AvroRuntimeException("Not a named type: " + this);
}
/** If this is a record, enum or fixed, add an alias. */
public void addAlias(String alias, String space) {
throw new AvroRuntimeException("Not a named type: " + this);
}
/** If this is a record, enum or fixed, return its aliases, if any. */
public Set<String> getAliases() {
throw new AvroRuntimeException("Not a named type: " + this);
}
/** Returns true if this record is an error type. */
public boolean isError() {
throw new AvroRuntimeException("Not a record: " + this);
}
/** If this is an array, returns its element type. */
public Schema getElementType() {
throw new AvroRuntimeException("Not an array: " + this);
}
/** If this is a map, returns its value type. */
public Schema getValueType() {
throw new AvroRuntimeException("Not a map: " + this);
}
/** If this is a union, returns its types. */
public List<Schema> getTypes() {
throw new AvroRuntimeException("Not a union: " + this);
}
/** If this is a union, return the branch with the provided full name. */
public Integer getIndexNamed(String name) {
throw new AvroRuntimeException("Not a union: " + this);
}
/** If this is fixed, returns its size. */
public int getFixedSize() {
throw new AvroRuntimeException("Not fixed: " + this);
}
/** Render this as <a href="https://json.org/">JSON</a>. */
@Override
public String toString() {
return toString(false);
}
/**
* Render this as <a href="https://json.org/">JSON</a>.
*
* @param pretty if true, pretty-print JSON.
*/
public String toString(boolean pretty) {
return toString(new Names(), pretty);
}
/**
* Render this as <a href="https://json.org/">JSON</a>, but without inlining the
* referenced schemas.
*
* @param referencedSchemas referenced schemas
* @param pretty if true, pretty-print JSON.
*/
// Use at your own risk. This method should be removed with AVRO-2832.
@Deprecated
public String toString(Collection<Schema> referencedSchemas, boolean pretty) {
Schema.Names names = new Schema.Names();
if (referencedSchemas != null) {
for (Schema s : referencedSchemas) {
names.add(s);
}
}
return toString(names, pretty);
}
String toString(Names names, boolean pretty) {
try {
StringWriter writer = new StringWriter();
JsonGenerator gen = FACTORY.createGenerator(writer);
if (pretty)
gen.useDefaultPrettyPrinter();
toJson(names, gen);
gen.flush();
return writer.toString();
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}
void toJson(Names names, JsonGenerator gen) throws IOException {
if (!hasProps()) { // no props defined
gen.writeString(getName()); // just write name
} else {
gen.writeStartObject();
gen.writeStringField("type", getName());
writeProps(gen);
gen.writeEndObject();
}
}
void fieldsToJson(Names names, JsonGenerator gen) throws IOException {
throw new AvroRuntimeException("Not a record: " + this);
}
@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof Schema))
return false;
Schema that = (Schema) o;
if (!(this.type == that.type))
return false;
return equalCachedHash(that) && propsEqual(that);
}
@Override
public final int hashCode() {
if (hashCode == NO_HASHCODE)
hashCode = computeHash();
return hashCode;
}
int computeHash() {
return getType().hashCode() + propsHashCode();
}
final boolean equalCachedHash(Schema other) {
return (hashCode == other.hashCode) || (hashCode == NO_HASHCODE) || (other.hashCode == NO_HASHCODE);
}
private static final Set<String> FIELD_RESERVED = Collections
.unmodifiableSet(new HashSet<>(Arrays.asList("default", "doc", "name", "order", "type", "aliases")));
/** Returns true if this record is an union type. */
public boolean isUnion() {
return this instanceof UnionSchema;
}
/** Returns true if this record is an union type containing null. */
public boolean isNullable() {
if (!isUnion()) {
return getType().equals(Schema.Type.NULL);
}
for (Schema schema : getTypes()) {
if (schema.isNullable()) {
return true;
}
}
return false;
}
/** A field within a record. */
public static class Field extends JsonProperties {
static {
Accessor.setAccessor(new FieldAccessor() {
@Override
protected JsonNode defaultValue(Field field) {
return field.defaultValue();
}
@Override
protected Field createField(String name, Schema schema, String doc, JsonNode defaultValue) {
return new Field(name, schema, doc, defaultValue, true, Order.ASCENDING);
}
@Override
protected Field createField(String name, Schema schema, String doc, JsonNode defaultValue, boolean validate,
Order order) {
return new Field(name, schema, doc, defaultValue, validate, order);
}
});
}
/** How values of this field should be ordered when sorting records. */
public enum Order {
ASCENDING, DESCENDING, IGNORE;
private final String name;
private Order() {
this.name = this.name().toLowerCase(Locale.ENGLISH);
}
};
/**
* For Schema unions with a "null" type as the first entry, this can be used to
* specify that the default for the union is null.
*/
public static final Object NULL_DEFAULT_VALUE = new Object();
private final String name; // name of the field.
private int position = -1;
private final Schema schema;
private final String doc;
private final JsonNode defaultValue;
private final Order order;
private Set<String> aliases;
Field(String name, Schema schema, String doc, JsonNode defaultValue, boolean validateDefault, Order order) {
super(FIELD_RESERVED);
this.name = validateName(name);
this.schema = schema;
this.doc = doc;
this.defaultValue = validateDefault ? validateDefault(name, schema, defaultValue) : defaultValue;
this.order = Objects.requireNonNull(order, "Order cannot be null");
}
/**
* Constructs a new Field instance with the same {@code name}, {@code doc},
* {@code defaultValue}, and {@code order} as {@code field} has with changing
* the schema to the specified one. It also copies all the {@code props} and
* {@code aliases}.
*/
public Field(Field field, Schema schema) {
this(field.name, schema, field.doc, field.defaultValue, true, field.order);
putAll(field);
if (field.aliases != null)
aliases = new LinkedHashSet<>(field.aliases);
}
/**
*
*/
public Field(String name, Schema schema) {
this(name, schema, (String) null, (JsonNode) null, true, Order.ASCENDING);
}
/**
*
*/
public Field(String name, Schema schema, String doc) {
this(name, schema, doc, (JsonNode) null, true, Order.ASCENDING);
}
/**
* @param defaultValue the default value for this field specified using the
* mapping in {@link JsonProperties}
*/
public Field(String name, Schema schema, String doc, Object defaultValue) {
this(name, schema, doc,
defaultValue == NULL_DEFAULT_VALUE ? NullNode.getInstance() : JacksonUtils.toJsonNode(defaultValue), true,
Order.ASCENDING);
}
/**
* @param defaultValue the default value for this field specified using the
* mapping in {@link JsonProperties}
*/
public Field(String name, Schema schema, String doc, Object defaultValue, Order order) {
this(name, schema, doc,
defaultValue == NULL_DEFAULT_VALUE ? NullNode.getInstance() : JacksonUtils.toJsonNode(defaultValue), true,
Objects.requireNonNull(order));
}
public String name() {
return name;
};
/** The position of this field within the record. */
public int pos() {
return position;
}
/** This field's {@link Schema}. */
public Schema schema() {
return schema;
}
/** Field's documentation within the record, if set. May return null. */
public String doc() {
return doc;
}
/**
* @return true if this Field has a default value set. Can be used to determine
* if a "null" return from defaultVal() is due to that being the default
* value or just not set.
*/
public boolean hasDefaultValue() {
return defaultValue != null;
}
JsonNode defaultValue() {
return defaultValue;
}
/**
* @return the default value for this field specified using the mapping in
* {@link JsonProperties}
*/
public Object defaultVal() {
return JacksonUtils.toObject(defaultValue, schema);
}
public Order order() {
return order;
}
public void addAlias(String alias) {
if (aliases == null)
this.aliases = new LinkedHashSet<>();
aliases.add(alias);
}
/** Return the defined aliases as an unmodifiable Set. */
public Set<String> aliases() {
if (aliases == null)
return Collections.emptySet();
return Collections.unmodifiableSet(aliases);
}
@Override
public boolean equals(Object other) {
if (other == this)
return true;
if (!(other instanceof Field))
return false;
Field that = (Field) other;
return (name.equals(that.name)) && (schema.equals(that.schema)) && defaultValueEquals(that.defaultValue)
&& (order == that.order) && propsEqual(that);
}
@Override
public int hashCode() {
return name.hashCode() + schema.computeHash();
}
private boolean defaultValueEquals(JsonNode thatDefaultValue) {
if (defaultValue == null)
return thatDefaultValue == null;
if (thatDefaultValue == null)
return false;
if (Double.isNaN(defaultValue.doubleValue()))
return Double.isNaN(thatDefaultValue.doubleValue());
return defaultValue.equals(thatDefaultValue);
}
@Override
public String toString() {
return name + " type:" + schema.type + " pos:" + position;
}
}
static class Name {
private final String name;
private final String space;
private final String full;
public Name(String name, String space) {
if (name == null) { // anonymous
this.name = this.space = this.full = null;
return;
}
int lastDot = name.lastIndexOf('.');
if (lastDot < 0) { // unqualified name
this.name = validateName(name);
} else { // qualified name
space = name.substring(0, lastDot); // get space from name
this.name = validateName(name.substring(lastDot + 1, name.length()));
}
if ("".equals(space))
space = null;
this.space = space;
this.full = (this.space == null) ? this.name : this.space + "." + this.name;
}
@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof Name))
return false;
Name that = (Name) o;
return Objects.equals(full, that.full);
}
@Override
public int hashCode() {
return full == null ? 0 : full.hashCode();
}
@Override
public String toString() {
return full;
}
public void writeName(Names names, JsonGenerator gen) throws IOException {
if (name != null)
gen.writeStringField("name", name);
if (space != null) {
if (!space.equals(names.space()))
gen.writeStringField("namespace", space);
} else if (names.space() != null) { // null within non-null
gen.writeStringField("namespace", "");
}
}
public String getQualified(String defaultSpace) {
return (space == null || space.equals(defaultSpace)) ? name : full;
}
}
private static abstract class NamedSchema extends Schema {
final Name name;
final String doc;
Set<Name> aliases;
public NamedSchema(Type type, Name name, String doc) {
super(type);
this.name = name;
this.doc = doc;
if (PRIMITIVES.containsKey(name.full)) {
throw new AvroTypeException("Schemas may not be named after primitives: " + name.full);
}
}
@Override
public String getName() {
return name.name;
}
@Override
public String getDoc() {
return doc;
}
@Override
public String getNamespace() {
return name.space;
}
@Override
public String getFullName() {
return name.full;
}
@Override
public void addAlias(String alias) {
addAlias(alias, null);
}
@Override
public void addAlias(String name, String space) {
if (aliases == null)
this.aliases = new LinkedHashSet<>();
if (space == null)
space = this.name.space;
aliases.add(new Name(name, space));
}
@Override
public Set<String> getAliases() {
Set<String> result = new LinkedHashSet<>();
if (aliases != null)
for (Name alias : aliases)
result.add(alias.full);
return result;
}
public boolean writeNameRef(Names names, JsonGenerator gen) throws IOException {
if (this.equals(names.get(name))) {
gen.writeString(name.getQualified(names.space()));
return true;
} else if (name.name != null) {
names.put(name, this);
}
return false;
}
public void writeName(Names names, JsonGenerator gen) throws IOException {
name.writeName(names, gen);
}
public boolean equalNames(NamedSchema that) {
return this.name.equals(that.name);
}
@Override
int computeHash() {
return super.computeHash() + name.hashCode();
}
public void aliasesToJson(JsonGenerator gen) throws IOException {
if (aliases == null || aliases.size() == 0)
return;
gen.writeFieldName("aliases");
gen.writeStartArray();
for (Name alias : aliases)
gen.writeString(alias.getQualified(name.space));
gen.writeEndArray();
}
}
/**
* Useful as key of {@link Map}s when traversing two schemas at the same time
* and need to watch for recursion.
*/
public static class SeenPair {
private Object s1;
private Object s2;
public SeenPair(Object s1, Object s2) {
this.s1 = s1;
this.s2 = s2;
}
public boolean equals(Object o) {
if (!(o instanceof SeenPair))
return false;
return this.s1 == ((SeenPair) o).s1 && this.s2 == ((SeenPair) o).s2;
}
@Override
public int hashCode() {
return System.identityHashCode(s1) + System.identityHashCode(s2);
}
}
private static final ThreadLocal<Set> SEEN_EQUALS = ThreadLocal.withInitial(HashSet::new);
private static final ThreadLocal<Map> SEEN_HASHCODE = ThreadLocal.withInitial(IdentityHashMap::new);
@SuppressWarnings(value = "unchecked")
private static class RecordSchema extends NamedSchema {
private List<Field> fields;
private Map<String, Field> fieldMap;
private final boolean isError;
public RecordSchema(Name name, String doc, boolean isError) {
super(Type.RECORD, name, doc);
this.isError = isError;
}
public RecordSchema(Name name, String doc, boolean isError, List<Field> fields) {
super(Type.RECORD, name, doc);
this.isError = isError;
setFields(fields);
}
@Override
public boolean isError() {
return isError;
}
@Override
public Field getField(String fieldname) {
if (fieldMap == null)
throw new AvroRuntimeException("Schema fields not set yet");
return fieldMap.get(fieldname);
}
@Override
public List<Field> getFields() {
if (fields == null)
throw new AvroRuntimeException("Schema fields not set yet");
return fields;
}
@Override
public void setFields(List<Field> fields) {
if (this.fields != null) {
throw new AvroRuntimeException("Fields are already set");
}
int i = 0;
fieldMap = new HashMap<>(Math.multiplyExact(2, fields.size()));
LockableArrayList<Field> ff = new LockableArrayList<>(fields.size());
for (Field f : fields) {
if (f.position != -1) {
throw new AvroRuntimeException("Field already used: " + f);
}
f.position = i++;
final Field existingField = fieldMap.put(f.name(), f);
if (existingField != null) {
throw new AvroRuntimeException(
String.format("Duplicate field %s in record %s: %s and %s.", f.name(), name, f, existingField));
}
ff.add(f);
}
this.fields = ff.lock();
this.hashCode = NO_HASHCODE;
}
@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof RecordSchema))
return false;
RecordSchema that = (RecordSchema) o;
if (!equalCachedHash(that))
return false;
if (!equalNames(that))
return false;
if (!propsEqual(that))
return false;
Set seen = SEEN_EQUALS.get();
SeenPair here = new SeenPair(this, o);
if (seen.contains(here))
return true; // prevent stack overflow
boolean first = seen.isEmpty();
try {
seen.add(here);
return Objects.equals(fields, that.fields);
} finally {
if (first)
seen.clear();
}
}
@Override
int computeHash() {
Map seen = SEEN_HASHCODE.get();
if (seen.containsKey(this))
return 0; // prevent stack overflow
boolean first = seen.isEmpty();
try {
seen.put(this, this);
return super.computeHash() + fields.hashCode();
} finally {
if (first)
seen.clear();
}
}
@Override
void toJson(Names names, JsonGenerator gen) throws IOException {
if (writeNameRef(names, gen))
return;
String savedSpace = names.space; // save namespace
gen.writeStartObject();
gen.writeStringField("type", isError ? "error" : "record");
writeName(names, gen);
names.space = name.space; // set default namespace
if (getDoc() != null)
gen.writeStringField("doc", getDoc());
if (fields != null) {
gen.writeFieldName("fields");
fieldsToJson(names, gen);
}
writeProps(gen);
aliasesToJson(gen);
gen.writeEndObject();
names.space = savedSpace; // restore namespace
}
@Override
void fieldsToJson(Names names, JsonGenerator gen) throws IOException {
gen.writeStartArray();
for (Field f : fields) {
gen.writeStartObject();
gen.writeStringField("name", f.name());
gen.writeFieldName("type");
f.schema().toJson(names, gen);
if (f.doc() != null)
gen.writeStringField("doc", f.doc());
if (f.hasDefaultValue()) {
gen.writeFieldName("default");
gen.writeTree(f.defaultValue());
}
if (f.order() != Field.Order.ASCENDING)
gen.writeStringField("order", f.order().name);
if (f.aliases != null && f.aliases.size() != 0) {
gen.writeFieldName("aliases");
gen.writeStartArray();
for (String alias : f.aliases)
gen.writeString(alias);
gen.writeEndArray();
}
f.writeProps(gen);
gen.writeEndObject();
}
gen.writeEndArray();
}
}
private static class EnumSchema extends NamedSchema {
private final List<String> symbols;
private final Map<String, Integer> ordinals;
private final String enumDefault;
public EnumSchema(Name name, String doc, LockableArrayList<String> symbols, String enumDefault) {
super(Type.ENUM, name, doc);
this.symbols = symbols.lock();
this.ordinals = new HashMap<>(Math.multiplyExact(2, symbols.size()));
this.enumDefault = enumDefault;
int i = 0;
for (String symbol : symbols) {
if (ordinals.put(validateName(symbol), i++) != null) {
throw new SchemaParseException("Duplicate enum symbol: " + symbol);
}
}
if (enumDefault != null && !symbols.contains(enumDefault)) {
throw new SchemaParseException(
"The Enum Default: " + enumDefault + " is not in the enum symbol set: " + symbols);
}
}
@Override
public List<String> getEnumSymbols() {
return symbols;
}
@Override
public boolean hasEnumSymbol(String symbol) {
return ordinals.containsKey(symbol);
}
@Override
public int getEnumOrdinal(String symbol) {
return ordinals.get(symbol);
}
@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof EnumSchema))
return false;
EnumSchema that = (EnumSchema) o;
return equalCachedHash(that) && equalNames(that) && symbols.equals(that.symbols) && propsEqual(that);
}
@Override
public String getEnumDefault() {
return enumDefault;
}
@Override
int computeHash() {
return super.computeHash() + symbols.hashCode();
}
@Override
void toJson(Names names, JsonGenerator gen) throws IOException {
if (writeNameRef(names, gen))
return;
gen.writeStartObject();
gen.writeStringField("type", "enum");
writeName(names, gen);
if (getDoc() != null)
gen.writeStringField("doc", getDoc());
gen.writeArrayFieldStart("symbols");
for (String symbol : symbols)
gen.writeString(symbol);
gen.writeEndArray();
if (getEnumDefault() != null)
gen.writeStringField("default", getEnumDefault());
writeProps(gen);
aliasesToJson(gen);
gen.writeEndObject();
}
}
private static class ArraySchema extends Schema {
private final Schema elementType;
public ArraySchema(Schema elementType) {
super(Type.ARRAY);
this.elementType = elementType;
}
@Override
public Schema getElementType() {
return elementType;
}
@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof ArraySchema))
return false;
ArraySchema that = (ArraySchema) o;
return equalCachedHash(that) && elementType.equals(that.elementType) && propsEqual(that);
}
@Override
int computeHash() {
return super.computeHash() + elementType.computeHash();
}
@Override
void toJson(Names names, JsonGenerator gen) throws IOException {
gen.writeStartObject();
gen.writeStringField("type", "array");
gen.writeFieldName("items");
elementType.toJson(names, gen);
writeProps(gen);
gen.writeEndObject();
}
}
private static class MapSchema extends Schema {
private final Schema valueType;
public MapSchema(Schema valueType) {
super(Type.MAP);
this.valueType = valueType;
}
@Override
public Schema getValueType() {
return valueType;
}
@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof MapSchema))
return false;
MapSchema that = (MapSchema) o;
return equalCachedHash(that) && valueType.equals(that.valueType) && propsEqual(that);
}
@Override
int computeHash() {
return super.computeHash() + valueType.computeHash();
}
@Override
void toJson(Names names, JsonGenerator gen) throws IOException {
gen.writeStartObject();
gen.writeStringField("type", "map");
gen.writeFieldName("values");
valueType.toJson(names, gen);
writeProps(gen);
gen.writeEndObject();
}
}
private static class UnionSchema extends Schema {
private final List<Schema> types;
private final Map<String, Integer> indexByName;
public UnionSchema(LockableArrayList<Schema> types) {
super(Type.UNION);
this.indexByName = new HashMap<>(Math.multiplyExact(2, types.size()));
this.types = types.lock();
int index = 0;
for (Schema type : types) {
if (type.getType() == Type.UNION) {
throw new AvroRuntimeException("Nested union: " + this);
}
String name = type.getFullName();
if (name == null) {
throw new AvroRuntimeException("Nameless in union:" + this);
}
if (indexByName.put(name, index++) != null) {
throw new AvroRuntimeException("Duplicate in union:" + name);
}
}
}
@Override
public List<Schema> getTypes() {
return types;
}
@Override
public Integer getIndexNamed(String name) {
return indexByName.get(name);
}
@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof UnionSchema))
return false;
UnionSchema that = (UnionSchema) o;
return equalCachedHash(that) && types.equals(that.types) && propsEqual(that);
}
@Override
int computeHash() {
int hash = super.computeHash();
for (Schema type : types)
hash += type.computeHash();
return hash;
}
@Override
public void addProp(String name, String value) {
throw new AvroRuntimeException("Can't set properties on a union: " + this);
}
@Override
void toJson(Names names, JsonGenerator gen) throws IOException {
gen.writeStartArray();
for (Schema type : types)
type.toJson(names, gen);
gen.writeEndArray();
}
}
private static class FixedSchema extends NamedSchema {
private final int size;
public FixedSchema(Name name, String doc, int size) {
super(Type.FIXED, name, doc);
if (size < 0)
throw new IllegalArgumentException("Invalid fixed size: " + size);
this.size = size;
}
@Override
public int getFixedSize() {
return size;
}
@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof FixedSchema))
return false;
FixedSchema that = (FixedSchema) o;
return equalCachedHash(that) && equalNames(that) && size == that.size && propsEqual(that);
}
@Override
int computeHash() {
return super.computeHash() + size;
}
@Override
void toJson(Names names, JsonGenerator gen) throws IOException {
if (writeNameRef(names, gen))
return;
gen.writeStartObject();
gen.writeStringField("type", "fixed");
writeName(names, gen);
if (getDoc() != null)
gen.writeStringField("doc", getDoc());
gen.writeNumberField("size", size);
writeProps(gen);
aliasesToJson(gen);
gen.writeEndObject();
}
}
private static class StringSchema extends Schema {
public StringSchema() {
super(Type.STRING);
}
}
private static class BytesSchema extends Schema {
public BytesSchema() {
super(Type.BYTES);
}
}
private static class IntSchema extends Schema {
public IntSchema() {
super(Type.INT);
}
}
private static class LongSchema extends Schema {
public LongSchema() {
super(Type.LONG);
}
}
private static class FloatSchema extends Schema {
public FloatSchema() {
super(Type.FLOAT);
}
}
private static class DoubleSchema extends Schema {
public DoubleSchema() {
super(Type.DOUBLE);
}
}
private static class BooleanSchema extends Schema {
public BooleanSchema() {
super(Type.BOOLEAN);
}
}
private static class NullSchema extends Schema {
public NullSchema() {
super(Type.NULL);
}
}
/**
* A parser for JSON-format schemas. Each named schema parsed with a parser is
* added to the names known to the parser so that subsequently parsed schemas
* may refer to it by name.
*/
public static class Parser {
private Names names = new Names();
private boolean validate = true;
private boolean validateDefaults = true;
/**
* Adds the provided types to the set of defined, named types known to this
* parser.
*/
public Parser addTypes(Map<String, Schema> types) {
for (Schema s : types.values())
names.add(s);
return this;
}
/** Returns the set of defined, named types known to this parser. */
public Map<String, Schema> getTypes() {
Map<String, Schema> result = new LinkedHashMap<>();
for (Schema s : names.values())
result.put(s.getFullName(), s);
return result;
}
/** Enable or disable name validation. */
public Parser setValidate(boolean validate) {
this.validate = validate;
return this;
}
/** True iff names are validated. True by default. */
public boolean getValidate() {
return this.validate;
}
/** Enable or disable default value validation. */
public Parser setValidateDefaults(boolean validateDefaults) {
this.validateDefaults = validateDefaults;
return this;
}
/** True iff default values are validated. False by default. */
public boolean getValidateDefaults() {
return this.validateDefaults;
}
/**
* Parse a schema from the provided file. If named, the schema is added to the
* names known to this parser.
*/
public Schema parse(File file) throws IOException {
return parse(FACTORY.createParser(file));
}
/**
* Parse a schema from the provided stream. If named, the schema is added to the
* names known to this parser. The input stream stays open after the parsing.
*/
public Schema parse(InputStream in) throws IOException {
return parse(FACTORY.createParser(in).disable(JsonParser.Feature.AUTO_CLOSE_SOURCE));
}
/** Read a schema from one or more json strings */
public Schema parse(String s, String... more) {
StringBuilder b = new StringBuilder(s);
for (String part : more)
b.append(part);
return parse(b.toString());
}
/**
* Parse a schema from the provided string. If named, the schema is added to the
* names known to this parser.
*/
public Schema parse(String s) {
try {
return parse(FACTORY.createParser(s));
} catch (IOException e) {
throw new SchemaParseException(e);
}
}
private Schema parse(JsonParser parser) throws IOException {
boolean saved = validateNames.get();
boolean savedValidateDefaults = VALIDATE_DEFAULTS.get();
try {
validateNames.set(validate);
VALIDATE_DEFAULTS.set(validateDefaults);
return Schema.parse(MAPPER.readTree(parser), names);
} catch (JsonParseException e) {
throw new SchemaParseException(e);
} finally {
parser.close();
validateNames.set(saved);
VALIDATE_DEFAULTS.set(savedValidateDefaults);
}
}
}
/**
* Constructs a Schema object from JSON schema file <tt>file</tt>. The contents
* of <tt>file</tt> is expected to be in UTF-8 format.
*
* @param file The file to read the schema from.
* @return The freshly built Schema.
* @throws IOException if there was trouble reading the contents or they are
* invalid
* @deprecated use {@link Schema.Parser} instead.
*/
@Deprecated
public static Schema parse(File file) throws IOException {
return new Parser().parse(file);
}
/**
* Constructs a Schema object from JSON schema stream <tt>in</tt>. The contents
* of <tt>in</tt> is expected to be in UTF-8 format.
*
* @param in The input stream to read the schema from.
* @return The freshly built Schema.
* @throws IOException if there was trouble reading the contents or they are
* invalid
* @deprecated use {@link Schema.Parser} instead.
*/
@Deprecated
public static Schema parse(InputStream in) throws IOException {
return new Parser().parse(in);
}
/**
* Construct a schema from <a href="https://json.org/">JSON</a> text.
*
* @deprecated use {@link Schema.Parser} instead.
*/
@Deprecated
public static Schema parse(String jsonSchema) {
return new Parser().parse(jsonSchema);
}
/**
* Construct a schema from <a href="https://json.org/">JSON</a> text.
*
* @param validate true if names should be validated, false if not.
* @deprecated use {@link Schema.Parser} instead.
*/
@Deprecated
public static Schema parse(String jsonSchema, boolean validate) {
return new Parser().setValidate(validate).parse(jsonSchema);
}
static final Map<String, Type> PRIMITIVES = new HashMap<>();
static {
PRIMITIVES.put("string", Type.STRING);
PRIMITIVES.put("bytes", Type.BYTES);
PRIMITIVES.put("int", Type.INT);
PRIMITIVES.put("long", Type.LONG);
PRIMITIVES.put("float", Type.FLOAT);
PRIMITIVES.put("double", Type.DOUBLE);
PRIMITIVES.put("boolean", Type.BOOLEAN);
PRIMITIVES.put("null", Type.NULL);
}
static class Names extends LinkedHashMap<Name, Schema> {
private static final long serialVersionUID = 1L;
private String space; // default namespace
public Names() {
}
public Names(String space) {
this.space = space;
}
public String space() {
return space;
}
public void space(String space) {
this.space = space;
}
public Schema get(String o) {
Type primitive = PRIMITIVES.get(o);
if (primitive != null) {
return Schema.create(primitive);
}
Name name = new Name(o, space);
if (!containsKey(name)) {
// if not in default try anonymous
name = new Name(o, "");
}
return super.get(name);
}
public boolean contains(Schema schema) {
return get(((NamedSchema) schema).name) != null;
}
public void add(Schema schema) {
put(((NamedSchema) schema).name, schema);
}
@Override
public Schema put(Name name, Schema schema) {
if (containsKey(name))
throw new SchemaParseException("Can't redefine: " + name);
return super.put(name, schema);
}
}
private static ThreadLocal<Boolean> validateNames = ThreadLocal.withInitial(() -> true);
private static String validateName(String name) {
if (!validateNames.get())
return name; // not validating names
int length = name.length();
if (length == 0)
throw new SchemaParseException("Empty name");
char first = name.charAt(0);
if (!(Character.isLetter(first) || first == '_'))
throw new SchemaParseException("Illegal initial character: " + name);
for (int i = 1; i < length; i++) {
char c = name.charAt(i);
if (!(Character.isLetterOrDigit(c) || c == '_'))
throw new SchemaParseException("Illegal character in: " + name);
}
return name;
}
private static final ThreadLocal<Boolean> VALIDATE_DEFAULTS = ThreadLocal.withInitial(() -> true);
private static JsonNode validateDefault(String fieldName, Schema schema, JsonNode defaultValue) {
if (VALIDATE_DEFAULTS.get() && (defaultValue != null) && !isValidDefault(schema, defaultValue)) { // invalid default
String message = "Invalid default for field " + fieldName + ": " + defaultValue + " not a " + schema;
throw new AvroTypeException(message); // throw exception
}
return defaultValue;
}
private static boolean isValidDefault(Schema schema, JsonNode defaultValue) {
if (defaultValue == null)
return false;
switch (schema.getType()) {
case STRING:
case BYTES:
case ENUM:
case FIXED:
return defaultValue.isTextual();
case INT:
return defaultValue.isIntegralNumber() && defaultValue.canConvertToInt();
case LONG:
return defaultValue.isIntegralNumber() && defaultValue.canConvertToLong();
case FLOAT:
case DOUBLE:
return defaultValue.isNumber();
case BOOLEAN:
return defaultValue.isBoolean();
case NULL:
return defaultValue.isNull();
case ARRAY:
if (!defaultValue.isArray())
return false;
for (JsonNode element : defaultValue)
if (!isValidDefault(schema.getElementType(), element))
return false;
return true;
case MAP:
if (!defaultValue.isObject())
return false;
for (JsonNode value : defaultValue)
if (!isValidDefault(schema.getValueType(), value))
return false;
return true;
case UNION: // union default: first branch
return isValidDefault(schema.getTypes().get(0), defaultValue);
case RECORD:
if (!defaultValue.isObject())
return false;
for (Field field : schema.getFields())
if (!isValidDefault(field.schema(),
defaultValue.has(field.name()) ? defaultValue.get(field.name()) : field.defaultValue()))
return false;
return true;
default:
return false;
}
}
/** @see #parse(String) */
static Schema parse(JsonNode schema, Names names) {
if (schema == null) {
throw new SchemaParseException("Cannot parse <null> schema");
}
if (schema.isTextual()) { // name
Schema result = names.get(schema.textValue());
if (result == null)
throw new SchemaParseException("Undefined name: " + schema);
return result;
} else if (schema.isObject()) {
Schema result;
String type = getRequiredText(schema, "type", "No type");
Name name = null;
String savedSpace = names.space();
String doc = null;
if (type.equals("record") || type.equals("error") || type.equals("enum") || type.equals("fixed")) {
String space = getOptionalText(schema, "namespace");
doc = getOptionalText(schema, "doc");
if (space == null)
space = names.space();
name = new Name(getRequiredText(schema, "name", "No name in schema"), space);
names.space(name.space); // set default namespace
}
if (PRIMITIVES.containsKey(type)) { // primitive
result = create(PRIMITIVES.get(type));
} else if (type.equals("record") || type.equals("error")) { // record
List<Field> fields = new ArrayList<>();
result = new RecordSchema(name, doc, type.equals("error"));
if (name != null)
names.add(result);
JsonNode fieldsNode = schema.get("fields");
if (fieldsNode == null || !fieldsNode.isArray())
throw new SchemaParseException("Record has no fields: " + schema);
for (JsonNode field : fieldsNode) {
String fieldName = getRequiredText(field, "name", "No field name");
String fieldDoc = getOptionalText(field, "doc");
JsonNode fieldTypeNode = field.get("type");
if (fieldTypeNode == null)
throw new SchemaParseException("No field type: " + field);
if (fieldTypeNode.isTextual() && names.get(fieldTypeNode.textValue()) == null)
throw new SchemaParseException(fieldTypeNode + " is not a defined name." + " The type of the \"" + fieldName
+ "\" field must be" + " a defined name or a {\"type\": ...} expression.");
Schema fieldSchema = parse(fieldTypeNode, names);
Field.Order order = Field.Order.ASCENDING;
JsonNode orderNode = field.get("order");
if (orderNode != null)
order = Field.Order.valueOf(orderNode.textValue().toUpperCase(Locale.ENGLISH));
JsonNode defaultValue = field.get("default");
if (defaultValue != null
&& (Type.FLOAT.equals(fieldSchema.getType()) || Type.DOUBLE.equals(fieldSchema.getType()))
&& defaultValue.isTextual())
defaultValue = new DoubleNode(Double.valueOf(defaultValue.textValue()));
Field f = new Field(fieldName, fieldSchema, fieldDoc, defaultValue, true, order);
Iterator<String> i = field.fieldNames();
while (i.hasNext()) { // add field props
String prop = i.next();
if (!FIELD_RESERVED.contains(prop))
f.addProp(prop, field.get(prop));
}
f.aliases = parseAliases(field);
fields.add(f);
}
result.setFields(fields);
} else if (type.equals("enum")) { // enum
JsonNode symbolsNode = schema.get("symbols");
if (symbolsNode == null || !symbolsNode.isArray())
throw new SchemaParseException("Enum has no symbols: " + schema);
LockableArrayList<String> symbols = new LockableArrayList<>(symbolsNode.size());
for (JsonNode n : symbolsNode)
symbols.add(n.textValue());
JsonNode enumDefault = schema.get("default");
String defaultSymbol = null;
if (enumDefault != null)
defaultSymbol = enumDefault.textValue();
result = new EnumSchema(name, doc, symbols, defaultSymbol);
if (name != null)
names.add(result);
} else if (type.equals("array")) { // array
JsonNode itemsNode = schema.get("items");
if (itemsNode == null)
throw new SchemaParseException("Array has no items type: " + schema);
result = new ArraySchema(parse(itemsNode, names));
} else if (type.equals("map")) { // map
JsonNode valuesNode = schema.get("values");
if (valuesNode == null)
throw new SchemaParseException("Map has no values type: " + schema);
result = new MapSchema(parse(valuesNode, names));
} else if (type.equals("fixed")) { // fixed
JsonNode sizeNode = schema.get("size");
if (sizeNode == null || !sizeNode.isInt())
throw new SchemaParseException("Invalid or no size: " + schema);
result = new FixedSchema(name, doc, sizeNode.intValue());
if (name != null)
names.add(result);
} else { // For unions with self reference
Name nameFromType = new Name(type, names.space);
if (names.containsKey(nameFromType)) {
return names.get(nameFromType);
}
throw new SchemaParseException("Type not supported: " + type);
}
Iterator<String> i = schema.fieldNames();
Set reserved = SCHEMA_RESERVED;
if (type.equals("enum")) {
reserved = ENUM_RESERVED;
}
while (i.hasNext()) { // add properties
String prop = i.next();
if (!reserved.contains(prop)) // ignore reserved
result.addProp(prop, schema.get(prop));
}
// parse logical type if present
result.logicalType = LogicalTypes.fromSchemaIgnoreInvalid(result);
names.space(savedSpace); // restore space
if (result instanceof NamedSchema) {
Set<String> aliases = parseAliases(schema);
if (aliases != null) // add aliases
for (String alias : aliases)
result.addAlias(alias);
}
return result;
} else if (schema.isArray()) { // union
LockableArrayList<Schema> types = new LockableArrayList<>(schema.size());
for (JsonNode typeNode : schema)
types.add(parse(typeNode, names));
return new UnionSchema(types);
} else {
throw new SchemaParseException("Schema not yet supported: " + schema);
}
}
static Set<String> parseAliases(JsonNode node) {
JsonNode aliasesNode = node.get("aliases");
if (aliasesNode == null)
return null;
if (!aliasesNode.isArray())
throw new SchemaParseException("aliases not an array: " + node);
Set<String> aliases = new LinkedHashSet<>();
for (JsonNode aliasNode : aliasesNode) {
if (!aliasNode.isTextual())
throw new SchemaParseException("alias not a string: " + aliasNode);
aliases.add(aliasNode.textValue());
}
return aliases;
}
/**
* Extracts text value associated to key from the container JsonNode, and throws
* {@link SchemaParseException} if it doesn't exist.
*
* @param container Container where to find key.
* @param key Key to look for in container.
* @param error String to prepend to the SchemaParseException.
*/
private static String getRequiredText(JsonNode container, String key, String error) {
String out = getOptionalText(container, key);
if (null == out) {
throw new SchemaParseException(error + ": " + container);
}
return out;
}
/** Extracts text value associated to key from the container JsonNode. */
private static String getOptionalText(JsonNode container, String key) {
JsonNode jsonNode = container.get(key);
return jsonNode != null ? jsonNode.textValue() : null;
}
static JsonNode parseJson(String s) {
try {
return MAPPER.readTree(FACTORY.createParser(s));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/**
* Parses the specified json string to an object.
*/
public static Object parseJsonToObject(String s) {
return JacksonUtils.toObject(parseJson(s));
}
/**
* Rewrite a writer's schema using the aliases from a reader's schema. This
* permits reading records, enums and fixed schemas whose names have changed,
* and records whose field names have changed. The returned schema always
* contains the same data elements in the same order, but with possibly
* different names.
*/
public static Schema applyAliases(Schema writer, Schema reader) {
if (writer.equals(reader))
return writer; // same schema
// create indexes of names
Map<Schema, Schema> seen = new IdentityHashMap<>(1);
Map<Name, Name> aliases = new HashMap<>(1);
Map<Name, Map<String, String>> fieldAliases = new HashMap<>(1);
getAliases(reader, seen, aliases, fieldAliases);
if (aliases.size() == 0 && fieldAliases.size() == 0)
return writer; // no aliases
seen.clear();
return applyAliases(writer, seen, aliases, fieldAliases);
}
private static Schema applyAliases(Schema s, Map<Schema, Schema> seen, Map<Name, Name> aliases,
Map<Name, Map<String, String>> fieldAliases) {
Name name = s instanceof NamedSchema ? ((NamedSchema) s).name : null;
Schema result = s;
switch (s.getType()) {
case RECORD:
if (seen.containsKey(s))
return seen.get(s); // break loops
if (aliases.containsKey(name))
name = aliases.get(name);
result = Schema.createRecord(name.full, s.getDoc(), null, s.isError());
seen.put(s, result);
List<Field> newFields = new ArrayList<>();
for (Field f : s.getFields()) {
Schema fSchema = applyAliases(f.schema, seen, aliases, fieldAliases);
String fName = getFieldAlias(name, f.name, fieldAliases);
Field newF = new Field(fName, fSchema, f.doc, f.defaultValue, true, f.order);
newF.putAll(f); // copy props
newFields.add(newF);
}
result.setFields(newFields);
break;
case ENUM:
if (aliases.containsKey(name))
result = Schema.createEnum(aliases.get(name).full, s.getDoc(), null, s.getEnumSymbols(), s.getEnumDefault());
break;
case ARRAY:
Schema e = applyAliases(s.getElementType(), seen, aliases, fieldAliases);
if (!e.equals(s.getElementType()))
result = Schema.createArray(e);
break;
case MAP:
Schema v = applyAliases(s.getValueType(), seen, aliases, fieldAliases);
if (!v.equals(s.getValueType()))
result = Schema.createMap(v);
break;
case UNION:
List<Schema> types = new ArrayList<>();
for (Schema branch : s.getTypes())
types.add(applyAliases(branch, seen, aliases, fieldAliases));
result = Schema.createUnion(types);
break;
case FIXED:
if (aliases.containsKey(name))
result = Schema.createFixed(aliases.get(name).full, s.getDoc(), null, s.getFixedSize());
break;
default:
// NO-OP
}
if (!result.equals(s))
result.putAll(s); // copy props
return result;
}
private static void getAliases(Schema schema, Map<Schema, Schema> seen, Map<Name, Name> aliases,
Map<Name, Map<String, String>> fieldAliases) {
if (schema instanceof NamedSchema) {
NamedSchema namedSchema = (NamedSchema) schema;
if (namedSchema.aliases != null)
for (Name alias : namedSchema.aliases)
aliases.put(alias, namedSchema.name);
}
switch (schema.getType()) {
case RECORD:
if (seen.containsKey(schema))
return; // break loops
seen.put(schema, schema);
RecordSchema record = (RecordSchema) schema;
for (Field field : schema.getFields()) {
if (field.aliases != null)
for (String fieldAlias : field.aliases) {
Map<String, String> recordAliases = fieldAliases.computeIfAbsent(record.name, k -> new HashMap<>());
recordAliases.put(fieldAlias, field.name);
}
getAliases(field.schema, seen, aliases, fieldAliases);
}
if (record.aliases != null && fieldAliases.containsKey(record.name))
for (Name recordAlias : record.aliases)
fieldAliases.put(recordAlias, fieldAliases.get(record.name));
break;
case ARRAY:
getAliases(schema.getElementType(), seen, aliases, fieldAliases);
break;
case MAP:
getAliases(schema.getValueType(), seen, aliases, fieldAliases);
break;
case UNION:
for (Schema s : schema.getTypes())
getAliases(s, seen, aliases, fieldAliases);
break;
}
}
private static String getFieldAlias(Name record, String field, Map<Name, Map<String, String>> fieldAliases) {
Map<String, String> recordAliases = fieldAliases.get(record);
if (recordAliases == null)
return field;
String alias = recordAliases.get(field);
if (alias == null)
return field;
return alias;
}
/**
* No change is permitted on LockableArrayList once lock() has been called on
* it.
*
* @param <E>
*/
/*
* This class keeps a boolean variable <tt>locked</tt> which is set to
* <tt>true</tt> in the lock() method. It's legal to call lock() any number of
* times. Any lock() other than the first one is a no-op.
*
* This class throws <tt>IllegalStateException</tt> if a mutating operation is
* performed after being locked. Since modifications through iterator also use
* the list's mutating operations, this effectively blocks all modifications.
*/
static class LockableArrayList<E> extends ArrayList<E> {
private static final long serialVersionUID = 1L;
private boolean locked = false;
public LockableArrayList() {
}
public LockableArrayList(int size) {
super(size);
}
public LockableArrayList(List<E> types) {
super(types);
}
public LockableArrayList(E... types) {
super(types.length);
Collections.addAll(this, types);
}
public List<E> lock() {
locked = true;
return this;
}
private void ensureUnlocked() {
if (locked) {
throw new IllegalStateException();
}
}
@Override
public boolean add(E e) {
ensureUnlocked();
return super.add(e);
}
@Override
public boolean remove(Object o) {
ensureUnlocked();
return super.remove(o);
}
@Override
public E remove(int index) {
ensureUnlocked();
return super.remove(index);
}
@Override
public boolean addAll(Collection<? extends E> c) {
ensureUnlocked();
return super.addAll(c);
}
@Override
public boolean addAll(int index, Collection<? extends E> c) {
ensureUnlocked();
return super.addAll(index, c);
}
@Override
public boolean removeAll(Collection<?> c) {
ensureUnlocked();
return super.removeAll(c);
}
@Override
public boolean retainAll(Collection<?> c) {
ensureUnlocked();
return super.retainAll(c);
}
@Override
public void clear() {
ensureUnlocked();
super.clear();
}
}
}