/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.avro;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.InputStream;
import java.io.StringWriter;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import java.util.HashSet;

import org.apache.avro.Schema.Field;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.node.TextNode;

/** A set of messages forming an application protocol.
 * <p> A protocol consists of:
 * <ul>
 * <li>a <i>name</i> for the protocol;
 * <li>an optional <i>namespace</i>, further qualifying the name;
 * <li>a list of <i>types</i>, or named {@link Schema schemas};
 * <li>a list of <i>errors</i>, or named {@link Schema schemas} for exceptions;
 * <li>a list of named <i>messages</i>, each of which specifies,
 *   <ul>
 *   <li><i>request</i>, the parameter schemas;
 *   <li>one of either;
 *     <ul><li>one-way</li></ul>
 *   or
 *     <ul>
 *       <li><i>response</i>, the response schema;
 *       <li><i>errors</i>, an optional list of potential error schema names.
 *     </ul>
 *   </ul>
 * </ul>
 */
public class Protocol extends JsonProperties {
  /** The version of the protocol specification implemented here. */
  public static final long VERSION = 1;

  // Support properties for both Protocol and Message objects
  private static final Set<String> MESSAGE_RESERVED = new HashSet<String>();
  static {
    Collections.addAll(MESSAGE_RESERVED,
                       "doc", "response","request", "errors", "one-way");
  }

  private static final Set<String> FIELD_RESERVED = new HashSet<String>();
  static {
    Collections.addAll(FIELD_RESERVED,
                       "name", "type", "doc", "default", "aliases");
  }

  /** A protocol message. */
  public class Message extends JsonProperties {
    private String name;
    private String doc;
    private Schema request;

    /** Construct a message. */
    private Message(String name, String doc,
                    Map<String,?> propMap, Schema request) {
      super(MESSAGE_RESERVED);
      this.name = name;
      this.doc = doc;
      this.request = request;

      if (propMap != null)                        // copy props
        for (Map.Entry<String,?> prop : propMap.entrySet()) {
          Object value = prop.getValue();
          this.addProp(prop.getKey(),
                       value instanceof String
                       ? TextNode.valueOf((String)value)
                       : (JsonNode)value);
        }
    }

    /** The name of this message. */
    public String getName() { return name; }
    /** The parameters of this message. */
    public Schema getRequest() { return request; }
    /** The returned data. */
    public Schema getResponse() { return Schema.create(Schema.Type.NULL); }
    /** Errors that might be thrown. */
    public Schema getErrors() {
      return Schema.createUnion(new ArrayList<Schema>());
    }

    /** Returns true if this is a one-way message, with no response or errors.*/
    public boolean isOneWay() { return true; }

    public String toString() {
      try {
        StringWriter writer = new StringWriter();
        JsonGenerator gen = Schema.FACTORY.createJsonGenerator(writer);
        toJson(gen);
        gen.flush();
        return writer.toString();
      } catch (IOException e) {
        throw new AvroRuntimeException(e);
      }
    }
    void toJson(JsonGenerator gen) throws IOException {
      gen.writeStartObject();
      if (doc != null) gen.writeStringField("doc", doc);
      writeProps(gen);                           // write out properties
      gen.writeFieldName("request");
      request.fieldsToJson(types, gen);

      toJson1(gen);
      gen.writeEndObject();
    }

    void toJson1(JsonGenerator gen) throws IOException {
      gen.writeStringField("response", "null");
      gen.writeBooleanField("one-way", true);
    }

    public boolean equals(Object o) {
      if (o == this) return true;
      if (!(o instanceof Message)) return false;
      Message that = (Message)o;
      return this.name.equals(that.name)
        && this.request.equals(that.request)
        && props.equals(that.props);
    }

    public int hashCode() {
      return name.hashCode() + request.hashCode() + props.hashCode();
    }

    public String getDoc() { return doc; }

  }

  private class TwoWayMessage extends Message {
    private Schema response;
    private Schema errors;

    /** Construct a message. */
    private TwoWayMessage(String name, String doc, Map<String,?> propMap,
                          Schema request, Schema response, Schema errors) {
      super(name, doc, propMap, request);
      this.response = response;
      this.errors = errors;
    }

    @Override public Schema getResponse() { return response; }
    @Override public Schema getErrors() { return errors; }
    @Override public boolean isOneWay() { return false; }

    @Override public boolean equals(Object o) {
      if (!super.equals(o)) return false;
      if (!(o instanceof TwoWayMessage)) return false;
      TwoWayMessage that = (TwoWayMessage)o;
      return this.response.equals(that.response)
        && this.errors.equals(that.errors);
    }

    @Override public int hashCode() {
      return super.hashCode() + response.hashCode() + errors.hashCode();
    }

    @Override void toJson1(JsonGenerator gen) throws IOException {
      gen.writeFieldName("response");
      response.toJson(types, gen);

      List<Schema> errs = errors.getTypes();  // elide system error
      if (errs.size() > 1) {
        Schema union = Schema.createUnion(errs.subList(1, errs.size()));
        gen.writeFieldName("errors");
        union.toJson(types, gen);
      }
    }

  }

  private String name;
  private String namespace;
  private String doc;

  private Schema.Names types = new Schema.Names();
  private Map<String,Message> messages = new LinkedHashMap<String,Message>();
  private byte[] md5;

  /** An error that can be thrown by any message. */
  public static final Schema SYSTEM_ERROR = Schema.create(Schema.Type.STRING);

  /** Union type for generating system errors. */
  public static final Schema SYSTEM_ERRORS;
  static {
    List<Schema> errors = new ArrayList<Schema>();
    errors.add(SYSTEM_ERROR);
    SYSTEM_ERRORS = Schema.createUnion(errors);
  }

  private static final Set<String> PROTOCOL_RESERVED = new HashSet<String>();
  static {
    Collections.addAll(PROTOCOL_RESERVED,
       "namespace", "protocol", "doc",
       "messages","types", "errors");
  }

  private Protocol() {
    super(PROTOCOL_RESERVED);
  }

  public Protocol(String name, String doc, String namespace) {
    super(PROTOCOL_RESERVED);
    this.name = name;
    this.doc = doc;
    this.namespace = namespace;
  }
  public Protocol(String name, String namespace) {
    this(name, null, namespace);
  }

  /** The name of this protocol. */
  public String getName() { return name; }

  /** The namespace of this protocol.  Qualifies its name. */
  public String getNamespace() { return namespace; }

  /** Doc string for this protocol. */
  public String getDoc() { return doc; }

  /** The types of this protocol. */
  public Collection<Schema> getTypes() { return types.values(); }

  /** Returns the named type. */
  public Schema getType(String name) { return types.get(name); }

  /** Set the types of this protocol. */
  public void setTypes(Collection<Schema> newTypes) {
    types = new Schema.Names();
    for (Schema s : newTypes)
      types.add(s);
  }

  /** The messages of this protocol. */
  public Map<String,Message> getMessages() { return messages; }

  /** Create a one-way message. */
  @Deprecated
  public Message createMessage(String name, String doc, Schema request) {
    return createMessage(name, doc, new LinkedHashMap<String,String>(),request);
  }
  /** Create a one-way message. */
  public <T> Message createMessage(String name, String doc,
                                   Map<String,T> propMap, Schema request) {
    return new Message(name, doc, propMap, request);
  }

  /** Create a two-way message. */
  @Deprecated
  public Message createMessage(String name, String doc, Schema request,
                               Schema response, Schema errors) {
    return createMessage(name, doc, new LinkedHashMap<String,String>(),
                         request, response, errors);
  }
  /** Create a two-way message. */
  public <T> Message createMessage(String name, String doc,
                                   Map<String,T> propMap, Schema request,
                                   Schema response, Schema errors) {
    return new TwoWayMessage(name, doc, propMap, request, response, errors);
  }

  public boolean equals(Object o) {
    if (o == this) return true;
    if (!(o instanceof Protocol)) return false;
    Protocol that = (Protocol)o;
    return this.name.equals(that.name)
      && this.namespace.equals(that.namespace)
      && this.types.equals(that.types)
      && this.messages.equals(that.messages)
      && this.props.equals(that.props);
  }

  public int hashCode() {
    return name.hashCode() + namespace.hashCode()
      + types.hashCode() + messages.hashCode() + props.hashCode();
  }

  /** Render this as <a href="http://json.org/">JSON</a>.*/
  @Override
  public String toString() { return toString(false); }

  /** Render this as <a href="http://json.org/">JSON</a>.
   * @param pretty if true, pretty-print JSON.
   */
  public String toString(boolean pretty) {
    try {
      StringWriter writer = new StringWriter();
      JsonGenerator gen = Schema.FACTORY.createJsonGenerator(writer);
      if (pretty) gen.useDefaultPrettyPrinter();
      toJson(gen);
      gen.flush();
      return writer.toString();
    } catch (IOException e) {
      throw new AvroRuntimeException(e);
    }
  }
  void toJson(JsonGenerator gen) throws IOException {
    types.space(namespace);

    gen.writeStartObject();
    gen.writeStringField("protocol", name);
    gen.writeStringField("namespace", namespace);

    if (doc != null) gen.writeStringField("doc", doc);
    writeProps(gen);
    gen.writeArrayFieldStart("types");
    Schema.Names resolved = new Schema.Names(namespace);
    for (Schema type : types.values())
      if (!resolved.contains(type))
        type.toJson(resolved, gen);
    gen.writeEndArray();

    gen.writeObjectFieldStart("messages");
    for (Map.Entry<String,Message> e : messages.entrySet()) {
      gen.writeFieldName(e.getKey());
      e.getValue().toJson(gen);
    }
    gen.writeEndObject();
    gen.writeEndObject();
  }

  /** Return the MD5 hash of the text of this protocol. */
  public byte[] getMD5() {
    if (md5 == null)
      try {
        md5 = MessageDigest.getInstance("MD5")
          .digest(this.toString().getBytes("UTF-8"));
      } catch (Exception e) {
        throw new AvroRuntimeException(e);
      }
    return md5;
  }

  /** Read a protocol from a Json file. */
  public static Protocol parse(File file) throws IOException {
    return parse(Schema.FACTORY.createJsonParser(file));
  }

  /** Read a protocol from a Json stream. */
  public static Protocol parse(InputStream stream) throws IOException {
    return parse(Schema.FACTORY.createJsonParser(stream));
  }

  /** Read a protocol from one or more json strings */
  public static Protocol parse(String string, String... more) {
    StringBuilder b = new StringBuilder(string);
    for (String part : more)
      b.append(part);
    return parse(b.toString());
  }

  /** Read a protocol from a Json string. */
  public static Protocol parse(String string) {
    try {
      return parse(Schema.FACTORY.createJsonParser
                   (new ByteArrayInputStream(string.getBytes("UTF-8"))));
    } catch (IOException e) {
      throw new AvroRuntimeException(e);
    }
  }

  private static Protocol parse(JsonParser parser) {
    try {
      Protocol protocol = new Protocol();
      protocol.parse(Schema.MAPPER.readTree(parser));
      return protocol;
    } catch (IOException e) {
      throw new SchemaParseException(e);
    }
  }

  private void parse(JsonNode json) {
    parseNamespace(json);
    parseName(json);
    parseTypes(json);
    parseMessages(json);
    parseDoc(json);
    parseProps(json);
  }

  private void parseNamespace(JsonNode json) {
    JsonNode nameNode = json.get("namespace");
    if (nameNode == null) return;                 // no namespace defined
    this.namespace = nameNode.getTextValue();
    types.space(this.namespace);
  }

  private void parseDoc(JsonNode json) {
    this.doc = parseDocNode(json);
  }

  private String parseDocNode(JsonNode json) {
    JsonNode nameNode = json.get("doc");
    if (nameNode == null) return null;                 // no doc defined
    return nameNode.getTextValue();
  }

  private void parseName(JsonNode json) {
    JsonNode nameNode = json.get("protocol");
    if (nameNode == null)
      throw new SchemaParseException("No protocol name specified: "+json);
    this.name = nameNode.getTextValue();
  }

  private void parseTypes(JsonNode json) {
    JsonNode defs = json.get("types");
    if (defs == null) return;                    // no types defined
    if (!defs.isArray())
      throw new SchemaParseException("Types not an array: "+defs);
    for (JsonNode type : defs) {
      if (!type.isObject())
        throw new SchemaParseException("Type not an object: "+type);
      Schema.parse(type, types);
    }
  }

  private void parseProps(JsonNode json) {
    for (Iterator<String> i = json.getFieldNames(); i.hasNext();) {
      String p = i.next();                        // add non-reserved as props
      if (!PROTOCOL_RESERVED.contains(p))
        this.addProp(p, json.get(p));
    }
  }

  private void parseMessages(JsonNode json) {
    JsonNode defs = json.get("messages");
    if (defs == null) return;                    // no messages defined
    for (Iterator<String> i = defs.getFieldNames(); i.hasNext();) {
      String prop = i.next();
      this.messages.put(prop, parseMessage(prop, defs.get(prop)));
    }
  }

  private Message parseMessage(String messageName, JsonNode json) {
    String doc = parseDocNode(json);

    Map<String,JsonNode> mProps = new LinkedHashMap<String,JsonNode>();
    for (Iterator<String> i = json.getFieldNames(); i.hasNext();) {
      String p = i.next();                        // add non-reserved as props
      if (!MESSAGE_RESERVED.contains(p))
        mProps.put(p, json.get(p));
    }

    JsonNode requestNode = json.get("request");
    if (requestNode == null || !requestNode.isArray())
      throw new SchemaParseException("No request specified: "+json);
    List<Field> fields = new ArrayList<Field>();
    for (JsonNode field : requestNode) {
      JsonNode fieldNameNode = field.get("name");
      if (fieldNameNode == null)
        throw new SchemaParseException("No param name: "+field);
      JsonNode fieldTypeNode = field.get("type");
      if (fieldTypeNode == null)
        throw new SchemaParseException("No param type: "+field);
      String name = fieldNameNode.getTextValue();
      String fieldDoc = null;
      JsonNode fieldDocNode = field.get("doc");
      if (fieldDocNode != null)
        fieldDoc = fieldDocNode.getTextValue();
      Field newField = new Field(name, Schema.parse(fieldTypeNode,types),
                                 fieldDoc, field.get("default"));
      Set<String> aliases = Schema.parseAliases(field);
      if (aliases != null) {                      // add aliases
        for (String alias : aliases)
          newField.addAlias(alias);
      }

      Iterator<String> i = field.getFieldNames();
      while (i.hasNext()) {                       // add properties
        String prop = i.next();
        if (!FIELD_RESERVED.contains(prop))      // ignore reserved
          newField.addProp(prop, field.get(prop));
      }
      fields.add(newField);
    }
    Schema request = Schema.createRecord(fields);

    boolean oneWay = false;
    JsonNode oneWayNode = json.get("one-way");
    if (oneWayNode != null) {
      if (!oneWayNode.isBoolean())
        throw new SchemaParseException("one-way must be boolean: "+json);
      oneWay = oneWayNode.getBooleanValue();
    }

    JsonNode responseNode = json.get("response");
    if (!oneWay && responseNode == null)
      throw new SchemaParseException("No response specified: "+json);

    JsonNode decls = json.get("errors");

    if (oneWay) {
      if (decls != null)
        throw new SchemaParseException("one-way can't have errors: "+json);
      if (responseNode != null
          && Schema.parse(responseNode, types).getType() != Schema.Type.NULL)
        throw new SchemaParseException("One way response must be null: "+json);
      return new Message(messageName, doc, mProps, request);
    }

    Schema response = Schema.parse(responseNode, types);

    List<Schema> errs = new ArrayList<Schema>();
    errs.add(SYSTEM_ERROR);                       // every method can throw
    if (decls != null) {
      if (!decls.isArray())
        throw new SchemaParseException("Errors not an array: "+json);
      for (JsonNode decl : decls) {
        String name = decl.getTextValue();
        Schema schema = this.types.get(name);
        if (schema == null)
          throw new SchemaParseException("Undefined error: "+name);
        if (!schema.isError())
          throw new SchemaParseException("Not an error: "+name);
        errs.add(schema);
      }
    }

    return new TwoWayMessage(messageName, doc, mProps, request, response,
                             Schema.createUnion(errs));
  }

  public static void main(String[] args) throws Exception {
    System.out.println(Protocol.parse(new File(args[0])));
  }

}

