/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.accumulo.testing.randomwalk;

import static java.nio.charset.StandardCharsets.UTF_8;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;

import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.util.SimpleThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;

/**
 * A module is directed graph of tests
 */
public class Module extends Node {

  private static final Logger log = LoggerFactory.getLogger(Module.class);

  private class Dummy extends Node {

    String name;

    Dummy(String name) {
      this.name = name;
    }

    @Override
    public void visit(State state, RandWalkEnv env, Properties props) {
      String print;
      if ((print = props.getProperty("print")) != null) {
        switch (print) {
          case "TRACE":
            log.trace(name);
            break;
          case "DEBUG":
            log.debug(name);
            break;
          case "INFO":
            log.info(name);
            break;
          case "WARN":
            log.warn(name);
            break;
          case "ERROR":
            log.error(name);
            break;
          default:
            log.info(name);
            break;
        }
      }
    }

    @Override
    public String toString() {
      return name;
    }
  }

  private class Alias extends Node {

    Node target;
    String targetId;
    String id;

    Alias(String id) {
      target = null;
      this.id = id;
    }

    @Override
    public void visit(State state, RandWalkEnv env, Properties props) throws Exception {
      throw new Exception("You don't visit aliases!");
    }

    @Override
    public String toString() {
      return id;
    }

    public void update(String node) throws Exception {
      targetId = node;
      target = getNode(node);
    }

    public Node get() {
      return target;
    }

    public String getTargetId() {
      return targetId;
    }
  }

  private HashMap<String,Node> nodes = new HashMap<>();
  private HashMap<String,Properties> localProps = new HashMap<>();

  private class Edge {
    String nodeId;
    int weight;
  }

  private class AdjList {

    private List<Edge> edges = new ArrayList<>();
    private int totalWeight = 0;
    private Random rand = new Random();

    /**
     * Adds a neighbor node and weight of edge
     */
    private void addEdge(String nodeId, int weight) {

      totalWeight += weight;

      Edge e = new Edge();
      e.nodeId = nodeId;
      e.weight = weight;
      edges.add(e);
    }

    /**
     * Chooses a random neighbor node
     *
     * @return Node or null if no edges
     */
    private String randomNeighbor() throws Exception {

      String nodeId = null;
      rand = new Random();

      int randNum = rand.nextInt(totalWeight) + 1;
      int sum = 0;

      for (Edge e : edges) {
        nodeId = e.nodeId;
        sum += e.weight;
        if (randNum <= sum) {
          break;
        }
      }
      return nodeId;
    }
  }

  private HashMap<String,String> prefixes = new HashMap<>();
  private HashMap<String,AdjList> adjMap = new HashMap<>();
  private HashMap<String,Set<String>> aliasMap = new HashMap<>();
  private final String id;
  private String initNodeId;
  private Fixture fixture = null;

  public Module(String id) throws Exception {
    this.id = id;
    loadFromXml();
  }

  @Override
  public void visit(final State state, final RandWalkEnv env, Properties props) throws Exception {
    int maxHops, maxSec;
    boolean teardown;

    Properties initProps = getProps("_init");
    initProps.putAll(props);
    String prop;
    if ((prop = initProps.getProperty("maxHops")) == null || prop.equals("0") || prop.equals(""))
      maxHops = Integer.MAX_VALUE;
    else
      maxHops = Integer.parseInt(initProps.getProperty("maxHops", "0"));

    if ((prop = initProps.getProperty("maxSec")) == null || prop.equals("0") || prop.equals(""))
      maxSec = Integer.MAX_VALUE;
    else
      maxSec = Integer.parseInt(initProps.getProperty("maxSec", "0"));

    if ((prop = initProps.getProperty("teardown")) == null || prop.equals("true") || prop.equals(""))
      teardown = true;
    else
      teardown = false;

    if (fixture != null) {
      fixture.setUp(state, env);
    }

    ExecutorService service = new SimpleThreadPool(1, "RandomWalk Runner");

    try {
      Node initNode = getNode(initNodeId);

      boolean test = false;
      if (initNode instanceof Test) {
        startTimer(initNode);
        test = true;
      }
      initNode.visit(state, env, getProps(initNodeId));
      if (test)
        stopTimer(initNode);

      // update aliases
      Set<String> aliases;
      if ((aliases = aliasMap.get(initNodeId)) != null)
        for (String alias : aliases) {
          ((Alias) nodes.get(alias)).update(initNodeId);
        }

      String curNodeId = initNodeId;
      int numHops = 0;
      long startTime = System.currentTimeMillis() / 1000;
      while (true) {
        // check if END state was reached
        if (curNodeId.equalsIgnoreCase("END")) {
          log.debug("reached END state");
          break;
        }
        // check if maxSec was reached
        long curTime = System.currentTimeMillis() / 1000;
        if ((curTime - startTime) > maxSec) {
          log.debug("reached maxSec(" + maxSec + ")");
          break;
        }

        // The number of seconds before the test should exit
        long secondsRemaining = maxSec - (curTime - startTime);

        // check if maxHops was reached
        if (numHops > maxHops) {
          log.debug("reached maxHops(" + maxHops + ")");
          break;
        }
        numHops++;

        if (!adjMap.containsKey(curNodeId) && !curNodeId.startsWith("alias.")) {
          throw new Exception("Reached node(" + curNodeId + ") without outgoing edges in module(" + this + ")");
        }
        AdjList adj = adjMap.get(curNodeId);
        String nextNodeId = adj.randomNeighbor();
        final Node nextNode;
        Node nextNodeOrAlias = getNode(nextNodeId);
        if (nextNodeOrAlias instanceof Alias) {
          nextNodeId = ((Alias) nextNodeOrAlias).getTargetId();
          nextNode = ((Alias) nextNodeOrAlias).get();
        } else {
          nextNode = nextNodeOrAlias;
        }
        final Properties nodeProps = getProps(nextNodeId);
        try {
          test = false;
          if (nextNode instanceof Test) {
            startTimer(nextNode);
            test = true;
          }

          // Wrap the visit of the next node in the module in a
          // callable that returns a thrown exception
          FutureTask<Exception> task = new FutureTask<>(new Callable<Exception>() {

            @Override
            public Exception call() throws Exception {
              try {
                nextNode.visit(state, env, nodeProps);
                return null;
              } catch (Exception e) {
                return e;
              }
            }

          });

          // Run the task (should execute immediately)
          service.submit(task);

          Exception nodeException;
          try {
            // Bound the time we'll wait for the node to complete
            nodeException = task.get(secondsRemaining, TimeUnit.SECONDS);
          } catch (InterruptedException e) {
            log.warn("Interrupted waiting for " + nextNode.getClass().getSimpleName() + " to complete. Exiting.", e);
            break;
          } catch (ExecutionException e) {
            log.error("Caught error executing " + nextNode.getClass().getSimpleName(), e);
            throw e;
          } catch (TimeoutException e) {
            log.info("Timed out waiting for " + nextNode.getClass().getSimpleName() + " to complete (waited " + secondsRemaining + " seconds). Exiting.", e);
            break;
          }

          // The RandomWalk node throw an Exception that that Callable
          // handed back
          // Throw it and let the Module perform cleanup
          if (null != nodeException) {
            throw nodeException;
          }

          if (test)
            stopTimer(nextNode);
        } catch (Exception e) {
          log.debug("AccumuloClient belongs to user: " + env.getAccumuloClient().whoami());
          log.debug("Exception occured at: " + System.currentTimeMillis());
          log.debug("Properties for node: " + nextNodeId);
          for (Entry<Object,Object> entry : nodeProps.entrySet()) {
            log.debug("  " + entry.getKey() + ": " + entry.getValue());
          }
          log.debug("Overall Configuration Properties");
          for (Entry<Object,Object> entry : env.getTestProperties().entrySet()) {
            log.debug("  " + entry.getKey() + ": " + entry.getValue());
          }
          log.debug("State information");
          for (String key : new TreeSet<>(state.getMap().keySet())) {
            Object value = state.getMap().get(key);
            String logMsg = "  " + key + ": ";
            if (value == null)
              logMsg += "null";
            else if (value instanceof String || value instanceof Map || value instanceof Collection || value instanceof Number)
              logMsg += value;
            else if (value instanceof byte[])
              logMsg += new String((byte[]) value, UTF_8);
            else if (value instanceof PasswordToken)
              logMsg += new String(((PasswordToken) value).getPassword(), UTF_8);
            else
              logMsg += value.getClass() + " - " + value;

            log.debug(logMsg);
          }
          throw new Exception("Error running node " + nextNodeId, e);
        }

        // update aliases
        if ((aliases = aliasMap.get(curNodeId)) != null)
          for (String alias : aliases) {
            ((Alias) nodes.get(alias)).update(curNodeId);
          }

        curNodeId = nextNodeId;
      }
    } finally {
      if (null != service) {
        service.shutdownNow();
      }
    }

    if (teardown && (fixture != null)) {
      log.debug("tearing down module");
      fixture.tearDown(state, env);
    }
  }

  Thread timer = null;
  final int time = 5 * 1000 * 60;
  AtomicBoolean runningLong = new AtomicBoolean(false);
  long systemTime;

  /**
   *
   */
  private void startTimer(final Node initNode) {
    runningLong.set(false);
    timer = new Thread(new Runnable() {
      @Override
      public void run() {
        try {
          systemTime = System.currentTimeMillis();
          Thread.sleep(time);
        } catch (InterruptedException ie) {
          return;
        }
        long timeSinceLastProgress = System.currentTimeMillis() - initNode.lastProgress();
        if (timeSinceLastProgress > time) {
          log.warn("Node " + initNode + " has been running for " + timeSinceLastProgress / 1000.0 + " seconds. You may want to look into it.");
          runningLong.set(true);
        }
      }
    });
    initNode.makingProgress();
    timer.start();
  }

  /**
   *
   */
  private void stopTimer(Node nextNode) {
    synchronized (timer) {
      timer.interrupt();
      try {
        timer.join();
      } catch (InterruptedException e) {
        log.error("Failed to join timer '" + timer.getName() + "'.", e);
      }
    }
    if (runningLong.get())
      log.warn("Node " + nextNode + ", which was running long, has now completed after " + (System.currentTimeMillis() - systemTime) / 1000.0 + " seconds");
  }

  @Override
  public String toString() {
    return id;
  }

  private String getFullName(String name) {

    int index = name.indexOf(".");
    if (index == -1 || name.endsWith(".xml")) {
      return name;
    }

    String id = name.substring(0, index);

    if (!prefixes.containsKey(id)) {
      log.warn("Id (" + id + ") was not found in prefixes");
      return name;
    }

    return prefixes.get(id).concat(name.substring(index + 1));
  }

  private Node createNode(String id, String src) throws Exception {

    // check if id indicates dummy node
    if (id.equalsIgnoreCase("END") || id.startsWith("dummy")) {
      if (nodes.containsKey(id) == false) {
        nodes.put(id, new Dummy(id));
      }
      return nodes.get(id);
    }

    if (id.startsWith("alias")) {
      if (nodes.containsKey(id) == false) {
        nodes.put(id, new Alias(id));
      }
      return nodes.get(id);
    }

    // grab node from framework based on its id or src
    Node node;
    if (src == null || src.isEmpty()) {
      node = Framework.getInstance().getNode(getFullName(id));
    } else {
      node = Framework.getInstance().getNode(getFullName(src));
    }

    // add to node to this module's map
    nodes.put(id, node);

    return node;
  }

  private Node getNode(String id) throws Exception {

    if (nodes.containsKey(id)) {
      return nodes.get(id);
    }

    if (id.equalsIgnoreCase("END")) {
      nodes.put(id, new Dummy(id));
      return nodes.get(id);
    }

    return Framework.getInstance().getNode(getFullName(id));
  }

  private Properties getProps(String nodeId) {
    if (localProps.containsKey(nodeId)) {
      return localProps.get(nodeId);
    }
    return new Properties();
  }

  private void loadFromXml() throws Exception {
    DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
    DocumentBuilder docbuilder;
    Document d = null;

    // set the schema
    SchemaFactory sf = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
    Schema moduleSchema = sf.newSchema(this.getClass().getClassLoader().getResource("randomwalk/module.xsd"));
    dbf.setSchema(moduleSchema);

    // parse the document

    try {
      docbuilder = dbf.newDocumentBuilder();
      d = docbuilder.parse(this.getClass().getResourceAsStream("/randomwalk/modules/" + id));
    } catch (Exception e) {
      log.error("Failed to parse xml at randomwalk/modules/" + id, e);
      throw new Exception("Failed to parse xml at randomwalk/modules/" + id);
    }

    // parse packages
    NodeList nodelist = d.getDocumentElement().getElementsByTagName("package");
    for (int i = 0; i < nodelist.getLength(); i++) {
      Element el = (Element) nodelist.item(i);
      String value = el.getAttribute("value");
      if (!value.endsWith(".")) {
        value = value.concat(".");
      }
      prefixes.put(el.getAttribute("prefix"), value);
    }

    // parse fixture node
    nodelist = d.getDocumentElement().getElementsByTagName("fixture");
    if (nodelist.getLength() > 0) {
      Element fixtureEl = (Element) nodelist.item(0);
      fixture = (Fixture) Class.forName(getFullName(fixtureEl.getAttribute("id"))).newInstance();
    }

    // parse initial node
    Element initEl = (Element) d.getDocumentElement().getElementsByTagName("init").item(0);
    initNodeId = initEl.getAttribute("id");
    Properties initProps = new Properties();
    String attr = initEl.getAttribute("maxHops");

    if (attr != null)
      initProps.setProperty("maxHops", attr);
    attr = initEl.getAttribute("maxSec");

    if (attr != null)
      initProps.setProperty("maxSec", attr);
    attr = initEl.getAttribute("teardown");

    if (attr != null)
      initProps.setProperty("teardown", attr);
    localProps.put("_init", initProps);

    // parse all nodes
    nodelist = d.getDocumentElement().getElementsByTagName("node");
    for (int i = 0; i < nodelist.getLength(); i++) {

      Element nodeEl = (Element) nodelist.item(i);

      // get attributes
      String id = nodeEl.getAttribute("id");
      if (adjMap.containsKey(id)) {
        throw new Exception("Module already contains: " + id);
      }
      String src = nodeEl.getAttribute("src");

      // create node
      createNode(id, src);

      // set some attributes in properties for later use
      Properties props = new Properties();
      props.setProperty("maxHops", nodeEl.getAttribute("maxHops"));
      props.setProperty("maxSec", nodeEl.getAttribute("maxSec"));
      props.setProperty("teardown", nodeEl.getAttribute("teardown"));

      // parse aliases
      NodeList aliaslist = nodeEl.getElementsByTagName("alias");
      Set<String> aliases = new TreeSet<>();
      for (int j = 0; j < aliaslist.getLength(); j++) {
        Element propEl = (Element) aliaslist.item(j);

        if (!propEl.hasAttribute("name")) {
          throw new Exception("Node " + id + " has alias with no identifying name");
        }

        String key = "alias." + propEl.getAttribute("name");

        aliases.add(key);
        createNode(key, null);
      }
      if (aliases.size() > 0)
        aliasMap.put(id, aliases);

      // parse properties of nodes
      NodeList proplist = nodeEl.getElementsByTagName("property");
      for (int j = 0; j < proplist.getLength(); j++) {
        Element propEl = (Element) proplist.item(j);

        if (!propEl.hasAttribute("key") || !propEl.hasAttribute("value")) {
          throw new Exception("Node " + id + " has property with no key or value");
        }

        String key = propEl.getAttribute("key");

        if (key.equals("maxHops") || key.equals("maxSec") || key.equals("teardown")) {
          throw new Exception("The following property can only be set in attributes: " + key);
        }

        props.setProperty(key, propEl.getAttribute("value"));
      }
      localProps.put(id, props);

      // parse edges of nodes
      AdjList edges = new AdjList();
      adjMap.put(id, edges);
      NodeList edgelist = nodeEl.getElementsByTagName("edge");
      if (edgelist.getLength() == 0) {
        throw new Exception("Node " + id + " has no edges!");
      }
      for (int j = 0; j < edgelist.getLength(); j++) {
        Element edgeEl = (Element) edgelist.item(j);

        String edgeID = edgeEl.getAttribute("id");

        if (!edgeEl.hasAttribute("weight")) {
          throw new Exception("Edge with id=" + edgeID + " is missing weight");
        }

        int weight = Integer.parseInt(edgeEl.getAttribute("weight"));
        edges.addEdge(edgeID, weight);
      }
    }
  }
}
