blob: 66c2b68bfdd31f633b404835a007aa7256aacaee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.map.ObjectMapper;
/** An abstract data type.
* <p>A schema may be one of:
* <ul>
* <li>A <i>record</i>, mapping field names to field value data;
* <li>An <i>enum</i>, containing one of a small set of symbols;
* <li>An <i>array</i> of values, all of the same schema;
* <li>A <i>map</i>, containing string/value pairs, of a declared schema;
* <li>A <i>union</i> of other schemas;
* <li>A <i>fixed</i> sized binary object;
* <li>A unicode <i>string</i>;
* <li>A sequence of <i>bytes</i>;
* <li>A 32-bit signed <i>int</i>;
* <li>A 64-bit signed <i>long</i>;
* <li>A 32-bit IEEE single-<i>float</i>; or
* <li>A 64-bit IEEE <i>double</i>-float; or
* <li>A <i>boolean</i>; or
* <li><i>null</i>.
* </ul>
*
* A schema can be constructed using one of its static <tt>createXXX</tt>
* methods. The schema objects are <i>logically</i> immutable.
* There are only two mutating methods - {@link #setFields(List)} and
* {@link #addProp(String, String)}. The following restrictions apply on these
* two methods.
* <ul>
* <li> {@link #setFields(List)}, can be called at most once. This method exists
* in order to enable clients to build recursive schemas.
* <li> {@link #addProp(String, String)} can be called with property names
* that are not present already. It is not possible to change or delete an
* existing property.
* </ul>
*/
public abstract class Schema {
static final JsonFactory FACTORY = new JsonFactory();
static final ObjectMapper MAPPER = new ObjectMapper(FACTORY);
static {
FACTORY.enable(JsonParser.Feature.ALLOW_COMMENTS);
FACTORY.setCodec(MAPPER);
}
/** The type of a schema. */
public enum Type {
RECORD, ENUM, ARRAY, MAP, UNION, FIXED, STRING, BYTES,
INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL;
private String name;
private Type() { this.name = this.name().toLowerCase(); }
};
private final Type type;
Schema(Type type) { this.type = type; }
/** Create a schema for a primitive type. */
public static Schema create(Type type) {
switch (type) {
case STRING: return new StringSchema();
case BYTES: return new BytesSchema();
case INT: return new IntSchema();
case LONG: return new LongSchema();
case FLOAT: return new FloatSchema();
case DOUBLE: return new DoubleSchema();
case BOOLEAN: return new BooleanSchema();
case NULL: return new NullSchema();
default: throw new AvroRuntimeException("Can't create a: "+type);
}
}
private static final class Props extends HashMap<String,String> {
private Set<String> reserved;
public Props(Set<String> reserved) {
super(1);
this.reserved = reserved;
}
public void add(String name, String value) {
if (reserved.contains(name))
throw new AvroRuntimeException("Can't set reserved property: " + name);
if (value == null)
throw new AvroRuntimeException("Can't set a property to null: " + name);
String old = get(name);
if (old == null)
put(name, value);
else if (!old.equals(value))
throw new AvroRuntimeException("Can't overwrite property: " + name);
}
public void write(JsonGenerator gen) throws IOException {
for (Map.Entry<String,String> e : entrySet())
gen.writeStringField(e.getKey(), e.getValue());
}
}
private static final Set<String> SCHEMA_RESERVED = new HashSet<String>();
static {
Collections.addAll(SCHEMA_RESERVED,
"doc", "fields", "items", "name", "namespace",
"size", "symbols", "values", "type");
}
Props props = new Props(SCHEMA_RESERVED);
/**
* Returns the value of the named property in this schema.
* Returns <tt>null</tt> if there is no property with that name.
*/
public synchronized String getProp(String name) {
return props.get(name);
}
/**
* Adds a property with the given name <tt>name</tt> and
* value <tt>value</tt>. Neither <tt>name</tt> nor <tt>value</tt> can be
* <tt>null</tt>. It is illegal to add a property if another with
* the same name but different value already exists in this schema.
*
* @param name The name of the property to add
* @param value The value for the property to add
*/
public synchronized void addProp(String name, String value) {
props.add(name, value);
}
/** Create an anonymous record schema. */
public static Schema createRecord(List<Field> fields) {
Schema result = createRecord(null, null, null, false);
result.setFields(fields);
return result;
}
/** Create a named record schema. */
public static Schema createRecord(String name, String doc, String namespace,
boolean isError) {
return new RecordSchema(new Name(name, namespace), doc, isError);
}
/** Create an enum schema. */
public static Schema createEnum(String name, String doc, String namespace,
List<String> values) {
return new EnumSchema(new Name(name, namespace), doc,
new LockableArrayList<String>(values));
}
/** Create an array schema. */
public static Schema createArray(Schema elementType) {
return new ArraySchema(elementType);
}
/** Create a map schema. */
public static Schema createMap(Schema valueType) {
return new MapSchema(valueType);
}
/** Create a union schema. */
public static Schema createUnion(List<Schema> types) {
return new UnionSchema(new LockableArrayList<Schema>(types));
}
/** Create a union schema. */
public static Schema createFixed(String name, String doc, String space,
int size) {
return new FixedSchema(new Name(name, space), doc, size);
}
/** Return the type of this schema. */
public Type getType() { return type; }
/**
* If this is a record, returns the Field with the
* given name <tt>fieldName</tt>. If there is no field by that name, a
* <tt>null</tt> is returned.
*/
public Field getField(String fieldname) {
throw new AvroRuntimeException("Not a record: "+this);
}
/**
* If this is a record, returns the fields in it. The returned
* list is in the order of their positions.
*/
public List<Field> getFields() {
throw new AvroRuntimeException("Not a record: "+this);
}
/**
* If this is a record, set its fields. The fields can be set
* only once in a schema.
*/
public void setFields(List<Field> fields) {
throw new AvroRuntimeException("Not a record: "+this);
}
/** If this is an enum, return its symbols. */
public List<String> getEnumSymbols() {
throw new AvroRuntimeException("Not an enum: "+this);
}
/** If this is an enum, return a symbol's ordinal value. */
public int getEnumOrdinal(String symbol) {
throw new AvroRuntimeException("Not an enum: "+this);
}
/** If this is an enum, returns true if it contains given symbol. */
public boolean hasEnumSymbol(String symbol) {
throw new AvroRuntimeException("Not an enum: "+this);
}
/** If this is a record, enum or fixed, returns its name, otherwise the name
* of the primitive type. */
public String getName() { return type.name; }
/** If this is a record, enum, or fixed, returns its docstring,
* if available. Otherwise, returns null. */
public String getDoc() {
return null;
}
/** If this is a record, enum or fixed, returns its namespace, if any. */
public String getNamespace() {
throw new AvroRuntimeException("Not a named type: "+this);
}
/** If this is a record, enum or fixed, returns its namespace-qualified name,
* otherwise returns the name of the primitive type. */
public String getFullName() {
return getName();
}
/** If this is a record, enum or fixed, add an alias. */
public void addAlias(String alias) {
throw new AvroRuntimeException("Not a named type: "+this);
}
/** If this is a record, enum or fixed, return its aliases, if any. */
public Set<String> getAliases() {
throw new AvroRuntimeException("Not a named type: "+this);
}
/** Returns true if this record is an error type. */
public boolean isError() {
throw new AvroRuntimeException("Not a record: "+this);
}
/** If this is an array, returns its element type. */
public Schema getElementType() {
throw new AvroRuntimeException("Not an array: "+this);
}
/** If this is a map, returns its value type. */
public Schema getValueType() {
throw new AvroRuntimeException("Not a map: "+this);
}
/** If this is a union, returns its types. */
public List<Schema> getTypes() {
throw new AvroRuntimeException("Not a union: "+this);
}
/** If this is fixed, returns its size. */
public int getFixedSize() {
throw new AvroRuntimeException("Not fixed: "+this);
}
/** Render this as <a href="http://json.org/">JSON</a>.*/
@Override
public String toString() { return toString(false); }
/** Render this as <a href="http://json.org/">JSON</a>.
* @param pretty if true, pretty-print JSON.
*/
public String toString(boolean pretty) {
try {
StringWriter writer = new StringWriter();
JsonGenerator gen = FACTORY.createJsonGenerator(writer);
if (pretty) gen.useDefaultPrettyPrinter();
toJson(new Names(), gen);
gen.flush();
return writer.toString();
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}
void toJson(Names names, JsonGenerator gen) throws IOException {
if (props.size() == 0) { // no props defined
gen.writeString(getName()); // just write name
} else {
gen.writeStartObject();
gen.writeStringField("type", getName());
props.write(gen);
gen.writeEndObject();
}
}
void fieldsToJson(Names names, JsonGenerator gen) throws IOException {
throw new AvroRuntimeException("Not a record: "+this);
}
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof Schema)) return false;
Schema that = (Schema)o;
if (!(this.type == that.type)) return false;
return props.equals(that.props);
}
public int hashCode() { return getType().hashCode() + props.hashCode(); }
private static final Set<String> FIELD_RESERVED = new HashSet<String>();
static {
Collections.addAll(FIELD_RESERVED, "default","doc","name","order","type");
}
/** A field within a record. */
public static class Field {
/** How values of this field should be ordered when sorting records. */
public enum Order {
ASCENDING, DESCENDING, IGNORE;
private String name;
private Order() { this.name = this.name().toLowerCase(); }
};
private final String name; // name of the field.
private transient int position = -1;
private final Schema schema;
private final String doc;
private final JsonNode defaultValue;
private final Order order;
private Set<String> aliases;
private final Props props = new Props(FIELD_RESERVED);
public Field(String name, Schema schema, String doc,
JsonNode defaultValue) {
this(name, schema, doc, defaultValue, Order.ASCENDING);
}
public Field(String name, Schema schema, String doc,
JsonNode defaultValue, Order order) {
this.name = validateName(name);
this.schema = schema;
this.doc = doc;
this.defaultValue = defaultValue;
this.order = order;
}
public String name() { return name; };
/** The position of this field within the record. */
public int pos() { return position; }
/** This field's {@link Schema}. */
public Schema schema() { return schema; }
/** Field's documentation within the record, if set. May return null. */
public String doc() { return doc; }
public JsonNode defaultValue() { return defaultValue; }
public Order order() { return order; }
/** Return the value of the named property in this field or null. */
public synchronized String getProp(String name) { return props.get(name); }
/** Add a property with the given name to this field. */
public synchronized void addProp(String name, String value) {
props.add(name, value);
}
public void addAlias(String alias) {
if (aliases == null)
this.aliases = new LinkedHashSet<String>();
aliases.add(alias);
}
public boolean equals(Object other) {
if (other == this) return true;
if (!(other instanceof Field)) return false;
Field that = (Field) other;
return (name.equals(that.name)) &&
(schema.equals(that.schema)) &&
(defaultValue == null
? that.defaultValue == null
: (defaultValue.equals(that.defaultValue))) &&
(order.equals(that.order)) &&
props.equals(that.props);
}
public int hashCode() { return name.hashCode() + schema.hashCode(); }
@Override
public String toString() {
return name + " type:" + schema.type + " pos:" + position;
}
}
private static class Name {
private final String name;
private final String space;
private final String full;
public Name(String name, String space) {
if (name == null) { // anonymous
this.name = this.space = this.full = null;
return;
}
int lastDot = name.lastIndexOf('.');
if (lastDot < 0) { // unqualified name
this.space = space; // use default space
this.name = validateName(name);
} else { // qualified name
this.space = name.substring(0, lastDot); // get space from name
this.name = validateName(name.substring(lastDot+1, name.length()));
}
this.full = (this.space == null) ? this.name : this.space+"."+this.name;
}
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof Name)) return false;
Name that = (Name)o;
return full==null ? that.full==null : full.equals(that.full);
}
public int hashCode() {
return full==null ? 0 : full.hashCode();
}
public String toString() { return full; }
public void writeName(Names names, JsonGenerator gen) throws IOException {
if (name != null) gen.writeStringField("name", name);
if (space != null) {
if (!space.equals(names.space()))
gen.writeStringField("namespace", space);
if (names.space() == null) // default namespace
names.space(space);
}
}
public String getQualified(String defaultSpace) {
return (space == null || space.equals(defaultSpace)) ? name : full;
}
}
private static abstract class NamedSchema extends Schema {
final Name name;
final String doc;
Set<Name> aliases;
public NamedSchema(Type type, Name name, String doc) {
super(type);
this.name = name;
this.doc = doc;
if (PRIMITIVES.containsKey(name.full)) {
throw new AvroTypeException("Schemas may not be named after primitives: " + name.full);
}
}
public String getName() { return name.name; }
public String getDoc() { return doc; }
public String getNamespace() { return name.space; }
public String getFullName() { return name.full; }
public void addAlias(String alias) {
if (aliases == null)
this.aliases = new LinkedHashSet<Name>();
aliases.add(new Name(alias, name.space));
}
public Set<String> getAliases() {
Set<String> result = new LinkedHashSet<String>();
if (aliases != null)
for (Name alias : aliases)
result.add(alias.full);
return result;
}
public boolean writeNameRef(Names names, JsonGenerator gen)
throws IOException {
if (this.equals(names.get(name))) {
gen.writeString(name.getQualified(names.space()));
return true;
} else if (name.name != null) {
names.put(name, this);
}
return false;
}
public void writeName(Names names, JsonGenerator gen) throws IOException {
name.writeName(names, gen);
}
public boolean equalNames(NamedSchema that) {
return this.name.equals(that.name);
}
public int hashCode() {
return getType().hashCode() + name.hashCode() + props.hashCode();
}
public void aliasesToJson(JsonGenerator gen) throws IOException {
if (aliases == null) return;
gen.writeFieldName("aliases");
gen.writeStartArray();
for (Name alias : aliases)
gen.writeString(alias.getQualified(name.space));
gen.writeEndArray();
}
}
private static class SeenPair {
private Object s1; private Object s2;
private SeenPair(Object s1, Object s2) { this.s1 = s1; this.s2 = s2; }
public boolean equals(Object o) {
return this.s1 == ((SeenPair)o).s1 && this.s2 == ((SeenPair)o).s2;
}
public int hashCode() {
return System.identityHashCode(s1) + System.identityHashCode(s2);
}
}
private static final ThreadLocal<Set> SEEN_EQUALS = new ThreadLocal<Set>() {
protected Set initialValue() { return new HashSet(); }
};
private static final ThreadLocal<Map> SEEN_HASHCODE = new ThreadLocal<Map>() {
protected Map initialValue() { return new IdentityHashMap(); }
};
@SuppressWarnings(value="unchecked")
private static class RecordSchema extends NamedSchema {
private List<Field> fields;
private Map<String, Field> fieldMap;
private final boolean isError;
public RecordSchema(Name name, String doc, boolean isError) {
super(Type.RECORD, name, doc);
this.isError = isError;
}
public boolean isError() { return isError; }
@Override
public Field getField(String fieldname) {
return fieldMap.get(fieldname);
}
@Override
public List<Field> getFields() {
return fields;
}
@Override
public void setFields(List<Field> fields) {
if (this.fields != null) {
throw new AvroRuntimeException("Fields are already set");
}
int i = 0;
fieldMap = new HashMap<String, Field>();
LockableArrayList ff = new LockableArrayList();
for (Field f : fields) {
if (f.position != -1)
throw new AvroRuntimeException("Field already used: " + f);
f.position = i++;
fieldMap.put(f.name(), f);
ff.add(f);
}
this.fields = ff.lock();
}
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof RecordSchema)) return false;
RecordSchema that = (RecordSchema)o;
if (!equalNames(that)) return false;
if (!props.equals(that.props)) return false;
Set seen = SEEN_EQUALS.get();
SeenPair here = new SeenPair(this, o);
if (seen.contains(here)) return true; // prevent stack overflow
boolean first = seen.isEmpty();
try {
seen.add(here);
return fields.equals(((RecordSchema)o).fields);
} finally {
if (first) seen.clear();
}
}
public int hashCode() {
Map seen = SEEN_HASHCODE.get();
if (seen.containsKey(this)) return 0; // prevent stack overflow
boolean first = seen.isEmpty();
try {
seen.put(this, this);
return super.hashCode() + fields.hashCode();
} finally {
if (first) seen.clear();
}
}
void toJson(Names names, JsonGenerator gen) throws IOException {
if (writeNameRef(names, gen)) return;
String savedSpace = names.space; // save namespace
gen.writeStartObject();
gen.writeStringField("type", isError?"error":"record");
writeName(names, gen);
names.space = name.space; // set default namespace
gen.writeFieldName("fields");
fieldsToJson(names, gen);
props.write(gen);
aliasesToJson(gen);
gen.writeEndObject();
names.space = savedSpace; // restore namespace
}
void fieldsToJson(Names names, JsonGenerator gen) throws IOException {
gen.writeStartArray();
for (Field f : fields) {
gen.writeStartObject();
gen.writeStringField("name", f.name());
gen.writeFieldName("type");
f.schema().toJson(names, gen);
if (f.doc() != null)
gen.writeStringField("doc", f.doc());
if (f.defaultValue() != null) {
gen.writeFieldName("default");
gen.writeTree(f.defaultValue());
}
if (f.order() != Field.Order.ASCENDING)
gen.writeStringField("order", f.order().name);
if (f.aliases != null) {
gen.writeFieldName("aliases");
gen.writeStartArray();
for (String alias : f.aliases)
gen.writeString(alias);
gen.writeEndArray();
}
f.props.write(gen);
gen.writeEndObject();
}
gen.writeEndArray();
}
}
private static class EnumSchema extends NamedSchema {
private final List<String> symbols;
private final Map<String,Integer> ordinals;
public EnumSchema(Name name, String doc,
LockableArrayList<String> symbols) {
super(Type.ENUM, name, doc);
this.symbols = symbols.lock();
this.ordinals = new HashMap<String,Integer>();
int i = 0;
for (String symbol : symbols)
if (ordinals.put(validateName(symbol), i++) != null)
throw new SchemaParseException("Duplicate enum symbol: "+symbol);
}
public List<String> getEnumSymbols() { return symbols; }
public boolean hasEnumSymbol(String symbol) {
return ordinals.containsKey(symbol); }
public int getEnumOrdinal(String symbol) { return ordinals.get(symbol); }
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof EnumSchema)) return false;
EnumSchema that = (EnumSchema)o;
return equalNames(that)
&& symbols.equals(that.symbols)
&& props.equals(that.props);
}
public int hashCode() { return super.hashCode() + symbols.hashCode(); }
void toJson(Names names, JsonGenerator gen) throws IOException {
if (writeNameRef(names, gen)) return;
gen.writeStartObject();
gen.writeStringField("type", "enum");
writeName(names, gen);
gen.writeArrayFieldStart("symbols");
for (String symbol : symbols)
gen.writeString(symbol);
gen.writeEndArray();
props.write(gen);
aliasesToJson(gen);
gen.writeEndObject();
}
}
private static class ArraySchema extends Schema {
private final Schema elementType;
public ArraySchema(Schema elementType) {
super(Type.ARRAY);
this.elementType = elementType;
}
public Schema getElementType() { return elementType; }
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof ArraySchema)) return false;
ArraySchema that = (ArraySchema)o;
return elementType.equals(that.elementType) && props.equals(that.props);
}
public int hashCode() {
return getType().hashCode() + elementType.hashCode() + props.hashCode();
}
void toJson(Names names, JsonGenerator gen) throws IOException {
gen.writeStartObject();
gen.writeStringField("type", "array");
gen.writeFieldName("items");
elementType.toJson(names, gen);
props.write(gen);
gen.writeEndObject();
}
}
private static class MapSchema extends Schema {
private final Schema valueType;
public MapSchema(Schema valueType) {
super(Type.MAP);
this.valueType = valueType;
}
public Schema getValueType() { return valueType; }
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof MapSchema)) return false;
MapSchema that = (MapSchema)o;
return valueType.equals(that.valueType) && props.equals(that.props);
}
public int hashCode() {
return getType().hashCode() + valueType.hashCode() + props.hashCode();
}
void toJson(Names names, JsonGenerator gen) throws IOException {
gen.writeStartObject();
gen.writeStringField("type", "map");
gen.writeFieldName("values");
valueType.toJson(names, gen);
props.write(gen);
gen.writeEndObject();
}
}
private static class UnionSchema extends Schema {
private final List<Schema> types;
public UnionSchema(LockableArrayList<Schema> types) {
super(Type.UNION);
this.types = types.lock();
int seen = 0;
Set<String> seenNames = new HashSet<String>();
for (Schema type : types) { // check legality of union
switch (type.getType()) {
case UNION:
throw new AvroRuntimeException("Nested union: "+this);
case RECORD:
case FIXED:
case ENUM:
String fullname = type.getFullName();
if (fullname != null) {
if (seenNames.add(fullname)) {
continue;
} else {
throw new AvroRuntimeException("Duplicate name in union:" + fullname);
}
} else {
throw new AvroRuntimeException("Nameless Record, Fixed, or Enum in union:"+this);
}
default:
int mask = 1 << type.getType().ordinal();
if ((seen & mask) != 0)
throw new AvroRuntimeException("Ambiguous union: "+this);
seen |= mask;
}
}
}
public List<Schema> getTypes() { return types; }
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof UnionSchema)) return false;
UnionSchema that = (UnionSchema)o;
return types.equals(that.types) && props.equals(that.props);
}
public int hashCode() {
return getType().hashCode() + types.hashCode() + props.hashCode();
}
@Override
public void addProp(String name, String value) {
throw new AvroRuntimeException("Can't set properties on a union: "+this);
}
void toJson(Names names, JsonGenerator gen) throws IOException {
gen.writeStartArray();
for (Schema type : types)
type.toJson(names, gen);
gen.writeEndArray();
}
}
private static class FixedSchema extends NamedSchema {
private final int size;
public FixedSchema(Name name, String doc, int size) {
super(Type.FIXED, name, doc);
if (size < 0)
throw new IllegalArgumentException("Invalid fixed size: "+size);
this.size = size;
}
public int getFixedSize() { return size; }
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof FixedSchema)) return false;
FixedSchema that = (FixedSchema)o;
return equalNames(that) && size == that.size && props.equals(that.props);
}
public int hashCode() { return super.hashCode() + size; }
void toJson(Names names, JsonGenerator gen) throws IOException {
if (writeNameRef(names, gen)) return;
gen.writeStartObject();
gen.writeStringField("type", "fixed");
writeName(names, gen);
gen.writeNumberField("size", size);
props.write(gen);
aliasesToJson(gen);
gen.writeEndObject();
}
}
private static class StringSchema extends Schema {
public StringSchema() { super(Type.STRING); }
}
private static class BytesSchema extends Schema {
public BytesSchema() { super(Type.BYTES); }
}
private static class IntSchema extends Schema {
public IntSchema() { super(Type.INT); }
}
private static class LongSchema extends Schema {
public LongSchema() { super(Type.LONG); }
}
private static class FloatSchema extends Schema {
public FloatSchema() { super(Type.FLOAT); }
}
private static class DoubleSchema extends Schema {
public DoubleSchema() { super(Type.DOUBLE); }
}
private static class BooleanSchema extends Schema {
public BooleanSchema() { super(Type.BOOLEAN); }
}
private static class NullSchema extends Schema {
public NullSchema() { super(Type.NULL); }
}
/**
* Constructs a Schema object from JSON schema file <tt>file</tt>.
* The contents of <tt>file</tt> is expected to be in UTF-8 format.
* @param file The file to read the schema from.
* @return The freshly built Schema.
* @throws IOException if there was trouble reading the contents
* @throws JsonParseException if the contents are invalid
*/
public static Schema parse(File file) throws IOException {
JsonParser parser = FACTORY.createJsonParser(file);
try {
return Schema.parse(MAPPER.readTree(parser), new Names());
} catch (JsonParseException e) {
throw new SchemaParseException(e);
}
}
/**
* Constructs a Schema object from JSON schema stream <tt>in</tt>.
* The contents of <tt>in</tt> is expected to be in UTF-8 format.
* @param in The input stream to read the schema from.
* @return The freshly built Schema.
* @throws IOException if there was trouble reading the contents
* @throws JsonParseException if the contents are invalid
*/
public static Schema parse(InputStream in) throws IOException {
JsonParser parser = FACTORY.createJsonParser(in);
try {
return Schema.parse(MAPPER.readTree(parser), new Names());
} catch (JsonParseException e) {
throw new SchemaParseException(e);
}
}
/** Construct a schema from <a href="http://json.org/">JSON</a> text. */
public static Schema parse(String jsonSchema) {
return parse(parseJson(jsonSchema), new Names());
}
static final Map<String,Type> PRIMITIVES = new HashMap<String,Type>();
static {
PRIMITIVES.put("string", Type.STRING);
PRIMITIVES.put("bytes", Type.BYTES);
PRIMITIVES.put("int", Type.INT);
PRIMITIVES.put("long", Type.LONG);
PRIMITIVES.put("float", Type.FLOAT);
PRIMITIVES.put("double", Type.DOUBLE);
PRIMITIVES.put("boolean", Type.BOOLEAN);
PRIMITIVES.put("null", Type.NULL);
}
static class Names extends LinkedHashMap<Name, Schema> {
private String space; // default namespace
public Names() {}
public Names(String space) { this.space = space; }
public String space() { return space; }
public void space(String space) { this.space = space; }
@Override
public Schema get(Object o) {
Name name;
if (o instanceof String) {
Type primitive = PRIMITIVES.get((String)o);
if (primitive != null) return Schema.create(primitive);
name = new Name((String)o, space);
} else {
name = (Name)o;
}
return super.get(name);
}
public boolean contains(Schema schema) {
return get(((NamedSchema)schema).name) != null;
}
public void add(Schema schema) {
put(((NamedSchema)schema).name, schema);
}
@Override
public Schema put(Name name, Schema schema) {
if (containsKey(name))
throw new SchemaParseException("Can't redefine: "+name);
return super.put(name, schema);
}
}
private static String validateName(String name) {
int length = name.length();
if (length == 0)
throw new SchemaParseException("Empty name");
char first = name.charAt(0);
if (!(Character.isLetter(first) || first == '_'))
throw new SchemaParseException("Illegal initial character: "+name);
for (int i = 1; i < length; i++) {
char c = name.charAt(i);
if (!(Character.isLetterOrDigit(c) || c == '_'))
throw new SchemaParseException("Illegal character in: "+name);
}
return name;
}
/** @see #parse(String) */
static Schema parse(JsonNode schema, Names names) {
if (schema.isTextual()) { // name
Schema result = names.get(schema.getTextValue());
if (result == null)
throw new SchemaParseException("Undefined name: "+schema);
return result;
} else if (schema.isObject()) {
Schema result;
String type = getRequiredText(schema, "type", "No type");
Name name = null;
String savedSpace = null;
String doc = null;
if (type.equals("record") || type.equals("error")
|| type.equals("enum") || type.equals("fixed")) {
String space = getOptionalText(schema, "namespace");
doc = getOptionalText(schema, "doc");
if (space == null)
space = names.space();
name = new Name(getRequiredText(schema, "name", "No name in schema"),
space);
if (name.space != null) { // set default namespace
savedSpace = names.space();
names.space(name.space);
}
}
if (PRIMITIVES.containsKey(type)) { // primitive
result = create(PRIMITIVES.get(type));
} else if (type.equals("record") || type.equals("error")) { // record
List<Field> fields = new ArrayList<Field>();
result = new RecordSchema(name, doc, type.equals("error"));
if (name != null) names.add(result);
JsonNode fieldsNode = schema.get("fields");
if (fieldsNode == null || !fieldsNode.isArray())
throw new SchemaParseException("Record has no fields: "+schema);
for (JsonNode field : fieldsNode) {
String fieldName = getRequiredText(field, "name", "No field name");
String fieldDoc = getOptionalText(field, "doc");
JsonNode fieldTypeNode = field.get("type");
if (fieldTypeNode == null)
throw new SchemaParseException("No field type: "+field);
if (fieldTypeNode.isTextual()
&& names.get(fieldTypeNode.getTextValue()) == null)
throw new SchemaParseException
(fieldTypeNode+" is not a defined name."
+" The type of the \""+fieldName+"\" field must be"
+" a defined name or a {\"type\": ...} expression.");
Schema fieldSchema = parse(fieldTypeNode, names);
Field.Order order = Field.Order.ASCENDING;
JsonNode orderNode = field.get("order");
if (orderNode != null)
order = Field.Order.valueOf(orderNode.getTextValue().toUpperCase());
Field f = new Field(fieldName, fieldSchema,
fieldDoc, field.get("default"), order);
Iterator<String> i = field.getFieldNames();
while (i.hasNext()) { // add field props
String prop = i.next();
String value = field.get(prop).getTextValue();
if (!FIELD_RESERVED.contains(prop) && value != null)
f.addProp(prop, value);
}
f.aliases = parseAliases(field);
fields.add(f);
}
result.setFields(fields);
} else if (type.equals("enum")) { // enum
JsonNode symbolsNode = schema.get("symbols");
if (symbolsNode == null || !symbolsNode.isArray())
throw new SchemaParseException("Enum has no symbols: "+schema);
LockableArrayList<String> symbols = new LockableArrayList<String>();
for (JsonNode n : symbolsNode)
symbols.add(n.getTextValue());
result = new EnumSchema(name, doc, symbols);
if (name != null) names.add(result);
} else if (type.equals("array")) { // array
JsonNode itemsNode = schema.get("items");
if (itemsNode == null)
throw new SchemaParseException("Array has no items type: "+schema);
result = new ArraySchema(parse(itemsNode, names));
} else if (type.equals("map")) { // map
JsonNode valuesNode = schema.get("values");
if (valuesNode == null)
throw new SchemaParseException("Map has no values type: "+schema);
result = new MapSchema(parse(valuesNode, names));
} else if (type.equals("fixed")) { // fixed
JsonNode sizeNode = schema.get("size");
if (sizeNode == null || !sizeNode.isInt())
throw new SchemaParseException("Invalid or no size: "+schema);
result = new FixedSchema(name, doc, sizeNode.getIntValue());
if (name != null) names.add(result);
} else
throw new SchemaParseException("Type not supported: "+type);
Iterator<String> i = schema.getFieldNames();
while (i.hasNext()) { // add properties
String prop = i.next();
String value = schema.get(prop).getTextValue();
if (!SCHEMA_RESERVED.contains(prop) && value != null) // ignore reserved
result.addProp(prop, value);
}
if (savedSpace != null)
names.space(savedSpace); // restore space
if (result instanceof NamedSchema) {
Set<String> aliases = parseAliases(schema);
if (aliases != null) // add aliases
for (String alias : aliases)
result.addAlias(alias);
}
return result;
} else if (schema.isArray()) { // union
LockableArrayList<Schema> types =
new LockableArrayList<Schema>(schema.size());
for (JsonNode typeNode : schema)
types.add(parse(typeNode, names));
return new UnionSchema(types);
} else {
throw new SchemaParseException("Schema not yet supported: "+schema);
}
}
private static Set<String> parseAliases(JsonNode node) {
JsonNode aliasesNode = node.get("aliases");
if (aliasesNode == null)
return null;
if (!aliasesNode.isArray())
throw new SchemaParseException("aliases not an array: "+node);
Set<String> aliases = new LinkedHashSet<String>();
for (JsonNode aliasNode : aliasesNode) {
if (!aliasNode.isTextual())
throw new SchemaParseException("alias not a string: "+aliasNode);
aliases.add(aliasNode.getTextValue());
}
return aliases;
}
/** Extracts text value associated to key from the container JsonNode,
* and throws {@link SchemaParseException} if it doesn't exist.
*
* @param container Container where to find key.
* @param key Key to look for in container.
* @param error String to prepend to the SchemaParseException.
* @return
*/
private static String getRequiredText(JsonNode container, String key,
String error) {
String out = getOptionalText(container, key);
if (null == out) {
throw new SchemaParseException(error + ": " + container);
}
return out;
}
/** Extracts text value associated to key from the container JsonNode. */
private static String getOptionalText(JsonNode container, String key) {
JsonNode jsonNode = container.get(key);
return jsonNode != null ? jsonNode.getTextValue() : null;
}
static JsonNode parseJson(String s) {
try {
return MAPPER.readTree(FACTORY.createJsonParser(new StringReader(s)));
} catch (JsonParseException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/** Rewrite a writer's schema using the aliases from a reader's schema. This
* permits reading records, enums and fixed schemas whose names have changed,
* and records whose field names have changed. The returned schema always
* contains the same data elements in the same order, but with possibly
* different names. */
public static Schema applyAliases(Schema writer, Schema reader) {
if (writer == reader) return writer; // same schema
// create indexes of names
Map<Schema,Schema> seen = new IdentityHashMap<Schema,Schema>(1);
Map<Name,Name> aliases = new HashMap<Name, Name>(1);
Map<Name,Map<String,String>> fieldAliases =
new HashMap<Name, Map<String,String>>(1);
getAliases(reader, seen, aliases, fieldAliases);
if (aliases.size() == 0 && fieldAliases.size() == 0)
return writer; // no aliases
seen.clear();
return applyAliases(writer, seen, aliases, fieldAliases);
}
private static Schema applyAliases(Schema s, Map<Schema,Schema> seen,
Map<Name,Name> aliases,
Map<Name,Map<String,String>> fieldAliases){
Name name = s instanceof NamedSchema ? ((NamedSchema)s).name : null;
Schema result = s;
switch (s.getType()) {
case RECORD:
if (seen.containsKey(s)) return seen.get(s); // break loops
if (aliases.containsKey(name))
name = aliases.get(name);
result = Schema.createRecord(name.full, s.getDoc(), null, s.isError());
seen.put(s, result);
List<Field> newFields = new ArrayList<Field>();
for (Field f : s.getFields()) {
Schema fSchema = applyAliases(f.schema, seen, aliases, fieldAliases);
String fName = getFieldAlias(name, f.name, fieldAliases);
Field newF = new Field(fName, fSchema, f.doc, f.defaultValue, f.order);
newF.props.putAll(f.props); // copy props
newFields.add(newF);
}
result.setFields(newFields);
break;
case ENUM:
if (aliases.containsKey(name))
result = Schema.createEnum(aliases.get(name).full, s.getDoc(), null,
s.getEnumSymbols());
break;
case ARRAY:
Schema e = applyAliases(s.getElementType(), seen, aliases, fieldAliases);
if (e != s.getElementType())
result = Schema.createArray(e);
break;
case MAP:
Schema v = applyAliases(s.getValueType(), seen, aliases, fieldAliases);
if (v != s.getValueType())
result = Schema.createMap(v);
break;
case UNION:
List<Schema> types = new ArrayList<Schema>();
for (Schema branch : s.getTypes())
types.add(applyAliases(branch, seen, aliases, fieldAliases));
result = Schema.createUnion(types);
break;
case FIXED:
if (aliases.containsKey(name))
result = Schema.createFixed(aliases.get(name).full, s.getDoc(), null,
s.getFixedSize());
break;
}
if (result != s)
result.props.putAll(s.props); // copy props
return result;
}
private static void getAliases(Schema schema,
Map<Schema,Schema> seen,
Map<Name,Name> aliases,
Map<Name,Map<String,String>> fieldAliases) {
if (schema instanceof NamedSchema) {
NamedSchema namedSchema = (NamedSchema)schema;
if (namedSchema.aliases != null)
for (Name alias : namedSchema.aliases)
aliases.put(alias, namedSchema.name);
}
switch (schema.getType()) {
case RECORD:
if (seen.containsKey(schema)) return; // break loops
seen.put(schema, schema);
RecordSchema record = (RecordSchema)schema;
for (Field field : schema.getFields()) {
if (field.aliases != null)
for (String fieldAlias : field.aliases) {
Map<String,String> recordAliases = fieldAliases.get(record.name);
if (recordAliases == null)
fieldAliases.put(record.name,
recordAliases = new HashMap<String,String>());
recordAliases.put(fieldAlias, field.name);
}
getAliases(field.schema, seen, aliases, fieldAliases);
}
if (record.aliases != null && fieldAliases.containsKey(record.name))
for (Name recordAlias : record.aliases)
fieldAliases.put(recordAlias, fieldAliases.get(record.name));
break;
case ARRAY:
getAliases(schema.getElementType(), seen, aliases, fieldAliases);
break;
case MAP:
getAliases(schema.getValueType(), seen, aliases, fieldAliases);
break;
case UNION:
for (Schema s : schema.getTypes())
getAliases(s, seen, aliases, fieldAliases);
break;
}
}
private static String getFieldAlias
(Name record, String field, Map<Name,Map<String,String>> fieldAliases) {
Map<String,String> recordAliases = fieldAliases.get(record);
if (recordAliases == null)
return field;
String alias = recordAliases.get(field);
if (alias == null)
return field;
return alias;
}
/**
* No change is permitted on LockableArrayList once lock() has been
* called on it.
* @param <E>
*/
/*
* This class keeps a boolean variable <tt>locked</tt> which is set
* to <tt>true</tt> in the lock() method. It's legal to call
* lock() any number of times. Any lock() other than the first one
* is a no-op.
*
* This class throws <tt>IllegalStateException</tt> if a mutating
* operation is performed after being locked. Since modifications through
* iterator also use the list's mutating operations, this effectively
* blocks all modifications.
*/
static class LockableArrayList<E> extends ArrayList<E> {
private static final long serialVersionUID = 1L;
private boolean locked = false;
public LockableArrayList() {
}
public LockableArrayList(int size) {
super(size);
}
public LockableArrayList(List<E> types) {
super(types);
}
public List<E> lock() {
locked = true;
return this;
}
private void ensureUnlocked() {
if (locked) {
throw new IllegalStateException();
}
}
public boolean add(E e) {
ensureUnlocked();
return super.add(e);
}
public boolean remove(Object o) {
ensureUnlocked();
return super.remove(o);
}
public E remove(int index) {
ensureUnlocked();
return super.remove(index);
}
public boolean addAll(Collection<? extends E> c) {
ensureUnlocked();
return super.addAll(c);
}
public boolean addAll(int index, Collection<? extends E> c) {
ensureUnlocked();
return super.addAll(index, c);
}
public boolean removeAll(Collection<?> c) {
ensureUnlocked();
return super.removeAll(c);
}
public boolean retainAll(Collection<?> c) {
ensureUnlocked();
return super.retainAll(c);
}
public void clear() {
ensureUnlocked();
super.clear();
}
}
}