blob: 73e235c89a9dcf3a004125d85b6c38bd641fdfa0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.StringWriter;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.HashSet;
import org.apache.avro.Schema.Field;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.node.TextNode;
/** A set of messages forming an application protocol.
* <p> A protocol consists of:
* <ul>
* <li>a <i>name</i> for the protocol;
* <li>an optional <i>namespace</i>, further qualifying the name;
* <li>a list of <i>types</i>, or named {@link Schema schemas};
* <li>a list of <i>errors</i>, or named {@link Schema schemas} for exceptions;
* <li>a list of named <i>messages</i>, each of which specifies,
* <ul>
* <li><i>request</i>, the parameter schemas;
* <li>one of either;
* <ul><li>one-way</li></ul>
* or
* <ul>
* <li><i>response</i>, the response schema;
* <li><i>errors</i>, an optional list of potential error schema names.
* </ul>
* </ul>
* </ul>
*/
public class Protocol extends JsonProperties {
/** The version of the protocol specification implemented here. */
public static final long VERSION = 1;
// Support properties for both Protocol and Message objects
private static final Set<String> MESSAGE_RESERVED = new HashSet<String>();
static {
Collections.addAll(MESSAGE_RESERVED,
"doc", "response","request", "errors", "one-way");
}
private static final Set<String> FIELD_RESERVED = new HashSet<String>();
static {
Collections.addAll(FIELD_RESERVED,
"name", "type", "doc", "default", "aliases");
}
/** A protocol message. */
public class Message extends JsonProperties {
private String name;
private String doc;
private Schema request;
/** Construct a message. */
private Message(String name, String doc,
Map<String,?> propMap, Schema request) {
super(MESSAGE_RESERVED);
this.name = name;
this.doc = doc;
this.request = request;
if (propMap != null) // copy props
for (Map.Entry<String,?> prop : propMap.entrySet()) {
Object value = prop.getValue();
this.addProp(prop.getKey(),
value instanceof String
? TextNode.valueOf((String)value)
: (JsonNode)value);
}
}
/** The name of this message. */
public String getName() { return name; }
/** The parameters of this message. */
public Schema getRequest() { return request; }
/** The returned data. */
public Schema getResponse() { return Schema.create(Schema.Type.NULL); }
/** Errors that might be thrown. */
public Schema getErrors() {
return Schema.createUnion(new ArrayList<Schema>());
}
/** Returns true if this is a one-way message, with no response or errors.*/
public boolean isOneWay() { return true; }
public String toString() {
try {
StringWriter writer = new StringWriter();
JsonGenerator gen = Schema.FACTORY.createJsonGenerator(writer);
toJson(gen);
gen.flush();
return writer.toString();
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}
void toJson(JsonGenerator gen) throws IOException {
gen.writeStartObject();
if (doc != null) gen.writeStringField("doc", doc);
writeProps(gen); // write out properties
gen.writeFieldName("request");
request.fieldsToJson(types, gen);
toJson1(gen);
gen.writeEndObject();
}
void toJson1(JsonGenerator gen) throws IOException {
gen.writeStringField("response", "null");
gen.writeBooleanField("one-way", true);
}
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof Message)) return false;
Message that = (Message)o;
return this.name.equals(that.name)
&& this.request.equals(that.request)
&& props.equals(that.props);
}
public int hashCode() {
return name.hashCode() + request.hashCode() + props.hashCode();
}
public String getDoc() { return doc; }
}
private class TwoWayMessage extends Message {
private Schema response;
private Schema errors;
/** Construct a message. */
private TwoWayMessage(String name, String doc, Map<String,?> propMap,
Schema request, Schema response, Schema errors) {
super(name, doc, propMap, request);
this.response = response;
this.errors = errors;
}
@Override public Schema getResponse() { return response; }
@Override public Schema getErrors() { return errors; }
@Override public boolean isOneWay() { return false; }
@Override public boolean equals(Object o) {
if (!super.equals(o)) return false;
if (!(o instanceof TwoWayMessage)) return false;
TwoWayMessage that = (TwoWayMessage)o;
return this.response.equals(that.response)
&& this.errors.equals(that.errors);
}
@Override public int hashCode() {
return super.hashCode() + response.hashCode() + errors.hashCode();
}
@Override void toJson1(JsonGenerator gen) throws IOException {
gen.writeFieldName("response");
response.toJson(types, gen);
List<Schema> errs = errors.getTypes(); // elide system error
if (errs.size() > 1) {
Schema union = Schema.createUnion(errs.subList(1, errs.size()));
gen.writeFieldName("errors");
union.toJson(types, gen);
}
}
}
private String name;
private String namespace;
private String doc;
private Schema.Names types = new Schema.Names();
private Map<String,Message> messages = new LinkedHashMap<String,Message>();
private byte[] md5;
/** An error that can be thrown by any message. */
public static final Schema SYSTEM_ERROR = Schema.create(Schema.Type.STRING);
/** Union type for generating system errors. */
public static final Schema SYSTEM_ERRORS;
static {
List<Schema> errors = new ArrayList<Schema>();
errors.add(SYSTEM_ERROR);
SYSTEM_ERRORS = Schema.createUnion(errors);
}
private static final Set<String> PROTOCOL_RESERVED = new HashSet<String>();
static {
Collections.addAll(PROTOCOL_RESERVED,
"namespace", "protocol", "doc",
"messages","types", "errors");
}
private Protocol() {
super(PROTOCOL_RESERVED);
}
public Protocol(String name, String doc, String namespace) {
super(PROTOCOL_RESERVED);
this.name = name;
this.doc = doc;
this.namespace = namespace;
}
public Protocol(String name, String namespace) {
this(name, null, namespace);
}
/** The name of this protocol. */
public String getName() { return name; }
/** The namespace of this protocol. Qualifies its name. */
public String getNamespace() { return namespace; }
/** Doc string for this protocol. */
public String getDoc() { return doc; }
/** The types of this protocol. */
public Collection<Schema> getTypes() { return types.values(); }
/** Returns the named type. */
public Schema getType(String name) { return types.get(name); }
/** Set the types of this protocol. */
public void setTypes(Collection<Schema> newTypes) {
types = new Schema.Names();
for (Schema s : newTypes)
types.add(s);
}
/** The messages of this protocol. */
public Map<String,Message> getMessages() { return messages; }
/** Create a one-way message. */
@Deprecated
public Message createMessage(String name, String doc, Schema request) {
return createMessage(name, doc, new LinkedHashMap<String,String>(),request);
}
/** Create a one-way message. */
public <T> Message createMessage(String name, String doc,
Map<String,T> propMap, Schema request) {
return new Message(name, doc, propMap, request);
}
/** Create a two-way message. */
@Deprecated
public Message createMessage(String name, String doc, Schema request,
Schema response, Schema errors) {
return createMessage(name, doc, new LinkedHashMap<String,String>(),
request, response, errors);
}
/** Create a two-way message. */
public <T> Message createMessage(String name, String doc,
Map<String,T> propMap, Schema request,
Schema response, Schema errors) {
return new TwoWayMessage(name, doc, propMap, request, response, errors);
}
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof Protocol)) return false;
Protocol that = (Protocol)o;
return this.name.equals(that.name)
&& this.namespace.equals(that.namespace)
&& this.types.equals(that.types)
&& this.messages.equals(that.messages)
&& this.props.equals(that.props);
}
public int hashCode() {
return name.hashCode() + namespace.hashCode()
+ types.hashCode() + messages.hashCode() + props.hashCode();
}
/** Render this as <a href="http://json.org/">JSON</a>.*/
@Override
public String toString() { return toString(false); }
/** Render this as <a href="http://json.org/">JSON</a>.
* @param pretty if true, pretty-print JSON.
*/
public String toString(boolean pretty) {
try {
StringWriter writer = new StringWriter();
JsonGenerator gen = Schema.FACTORY.createJsonGenerator(writer);
if (pretty) gen.useDefaultPrettyPrinter();
toJson(gen);
gen.flush();
return writer.toString();
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}
void toJson(JsonGenerator gen) throws IOException {
types.space(namespace);
gen.writeStartObject();
gen.writeStringField("protocol", name);
gen.writeStringField("namespace", namespace);
if (doc != null) gen.writeStringField("doc", doc);
writeProps(gen);
gen.writeArrayFieldStart("types");
Schema.Names resolved = new Schema.Names(namespace);
for (Schema type : types.values())
if (!resolved.contains(type))
type.toJson(resolved, gen);
gen.writeEndArray();
gen.writeObjectFieldStart("messages");
for (Map.Entry<String,Message> e : messages.entrySet()) {
gen.writeFieldName(e.getKey());
e.getValue().toJson(gen);
}
gen.writeEndObject();
gen.writeEndObject();
}
/** Return the MD5 hash of the text of this protocol. */
public byte[] getMD5() {
if (md5 == null)
try {
md5 = MessageDigest.getInstance("MD5")
.digest(this.toString().getBytes("UTF-8"));
} catch (Exception e) {
throw new AvroRuntimeException(e);
}
return md5;
}
/** Read a protocol from a Json file. */
public static Protocol parse(File file) throws IOException {
return parse(Schema.FACTORY.createJsonParser(file));
}
/** Read a protocol from a Json stream. */
public static Protocol parse(InputStream stream) throws IOException {
return parse(Schema.FACTORY.createJsonParser(stream));
}
/** Read a protocol from one or more json strings */
public static Protocol parse(String string, String... more) {
StringBuilder b = new StringBuilder(string);
for (String part : more)
b.append(part);
return parse(b.toString());
}
/** Read a protocol from a Json string. */
public static Protocol parse(String string) {
try {
return parse(Schema.FACTORY.createJsonParser
(new ByteArrayInputStream(string.getBytes("UTF-8"))));
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}
private static Protocol parse(JsonParser parser) {
try {
Protocol protocol = new Protocol();
protocol.parse(Schema.MAPPER.readTree(parser));
return protocol;
} catch (IOException e) {
throw new SchemaParseException(e);
}
}
private void parse(JsonNode json) {
parseNamespace(json);
parseName(json);
parseTypes(json);
parseMessages(json);
parseDoc(json);
parseProps(json);
}
private void parseNamespace(JsonNode json) {
JsonNode nameNode = json.get("namespace");
if (nameNode == null) return; // no namespace defined
this.namespace = nameNode.getTextValue();
types.space(this.namespace);
}
private void parseDoc(JsonNode json) {
this.doc = parseDocNode(json);
}
private String parseDocNode(JsonNode json) {
JsonNode nameNode = json.get("doc");
if (nameNode == null) return null; // no doc defined
return nameNode.getTextValue();
}
private void parseName(JsonNode json) {
JsonNode nameNode = json.get("protocol");
if (nameNode == null)
throw new SchemaParseException("No protocol name specified: "+json);
this.name = nameNode.getTextValue();
}
private void parseTypes(JsonNode json) {
JsonNode defs = json.get("types");
if (defs == null) return; // no types defined
if (!defs.isArray())
throw new SchemaParseException("Types not an array: "+defs);
for (JsonNode type : defs) {
if (!type.isObject())
throw new SchemaParseException("Type not an object: "+type);
Schema.parse(type, types);
}
}
private void parseProps(JsonNode json) {
for (Iterator<String> i = json.getFieldNames(); i.hasNext();) {
String p = i.next(); // add non-reserved as props
if (!PROTOCOL_RESERVED.contains(p))
this.addProp(p, json.get(p));
}
}
private void parseMessages(JsonNode json) {
JsonNode defs = json.get("messages");
if (defs == null) return; // no messages defined
for (Iterator<String> i = defs.getFieldNames(); i.hasNext();) {
String prop = i.next();
this.messages.put(prop, parseMessage(prop, defs.get(prop)));
}
}
private Message parseMessage(String messageName, JsonNode json) {
String doc = parseDocNode(json);
Map<String,JsonNode> mProps = new LinkedHashMap<String,JsonNode>();
for (Iterator<String> i = json.getFieldNames(); i.hasNext();) {
String p = i.next(); // add non-reserved as props
if (!MESSAGE_RESERVED.contains(p))
mProps.put(p, json.get(p));
}
JsonNode requestNode = json.get("request");
if (requestNode == null || !requestNode.isArray())
throw new SchemaParseException("No request specified: "+json);
List<Field> fields = new ArrayList<Field>();
for (JsonNode field : requestNode) {
JsonNode fieldNameNode = field.get("name");
if (fieldNameNode == null)
throw new SchemaParseException("No param name: "+field);
JsonNode fieldTypeNode = field.get("type");
if (fieldTypeNode == null)
throw new SchemaParseException("No param type: "+field);
String name = fieldNameNode.getTextValue();
String fieldDoc = null;
JsonNode fieldDocNode = field.get("doc");
if (fieldDocNode != null)
fieldDoc = fieldDocNode.getTextValue();
Field newField = new Field(name, Schema.parse(fieldTypeNode,types),
fieldDoc, field.get("default"));
Set<String> aliases = Schema.parseAliases(field);
if (aliases != null) { // add aliases
for (String alias : aliases)
newField.addAlias(alias);
}
Iterator<String> i = field.getFieldNames();
while (i.hasNext()) { // add properties
String prop = i.next();
if (!FIELD_RESERVED.contains(prop)) // ignore reserved
newField.addProp(prop, field.get(prop));
}
fields.add(newField);
}
Schema request = Schema.createRecord(fields);
boolean oneWay = false;
JsonNode oneWayNode = json.get("one-way");
if (oneWayNode != null) {
if (!oneWayNode.isBoolean())
throw new SchemaParseException("one-way must be boolean: "+json);
oneWay = oneWayNode.getBooleanValue();
}
JsonNode responseNode = json.get("response");
if (!oneWay && responseNode == null)
throw new SchemaParseException("No response specified: "+json);
JsonNode decls = json.get("errors");
if (oneWay) {
if (decls != null)
throw new SchemaParseException("one-way can't have errors: "+json);
if (responseNode != null
&& Schema.parse(responseNode, types).getType() != Schema.Type.NULL)
throw new SchemaParseException("One way response must be null: "+json);
return new Message(messageName, doc, mProps, request);
}
Schema response = Schema.parse(responseNode, types);
List<Schema> errs = new ArrayList<Schema>();
errs.add(SYSTEM_ERROR); // every method can throw
if (decls != null) {
if (!decls.isArray())
throw new SchemaParseException("Errors not an array: "+json);
for (JsonNode decl : decls) {
String name = decl.getTextValue();
Schema schema = this.types.get(name);
if (schema == null)
throw new SchemaParseException("Undefined error: "+name);
if (!schema.isError())
throw new SchemaParseException("Not an error: "+name);
errs.add(schema);
}
}
return new TwoWayMessage(messageName, doc, mProps, request, response,
Schema.createUnion(errs));
}
public static void main(String[] args) throws Exception {
System.out.println(Protocol.parse(new File(args[0])));
}
}