/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tez.history.parser.datamodel;

import org.apache.tez.common.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.LinkedHashMultimap;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Ordering;
import org.apache.commons.collections4.BidiMap;
import org.apache.commons.collections4.bidimap.DualHashBidiMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.client.CallerContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.history.HistoryEventType;
import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

import static org.apache.hadoop.classification.InterfaceAudience.Public;
import static org.apache.hadoop.classification.InterfaceStability.Evolving;

@Public
@Evolving
public class DagInfo extends BaseInfo {

  private static final Log LOG = LogFactory.getLog(DagInfo.class);

  //Fields populated via JSON
  private final String name;
  private final long startTime;
  private final long endTime;
  private final long submitTime;
  private final int failedTasks;
  private final String dagId;
  private final int numVertices;
  private final String status;
  private final String diagnostics;
  private String userName;
  private VersionInfo versionInfo;
  private CallerContext callerContext;

  //VertexID --> VertexName & vice versa
  private final BidiMap<String, String> vertexNameIDMapping;

  //edgeId to EdgeInfo mapping
  private final Map<Integer, EdgeInfo> edgeInfoMap;

  //Only for internal parsing (vertexname mapping)
  private Map<String, BasicVertexInfo> basicVertexInfoMap;

  //VertexName --> VertexInfo
  private Map<String, VertexInfo> vertexNameMap;

  private Multimap<Container, TaskAttemptInfo> containerMapping;
  private Map<String, String> config;

  DagInfo(JSONObject jsonObject) throws JSONException {
    super(jsonObject);

    vertexNameMap = Maps.newHashMap();
    vertexNameIDMapping = new DualHashBidiMap<>();
    edgeInfoMap = Maps.newHashMap();
    basicVertexInfoMap = Maps.newHashMap();
    containerMapping = LinkedHashMultimap.create();

    Preconditions.checkArgument(jsonObject.getString(Constants.ENTITY_TYPE).equalsIgnoreCase
        (Constants.TEZ_DAG_ID));

    dagId = StringInterner.weakIntern(jsonObject.getString(Constants.ENTITY));

    //Parse additional Info
    JSONObject otherInfoNode = jsonObject.getJSONObject(Constants.OTHER_INFO);

    long sTime = otherInfoNode.optLong(Constants.START_TIME);
    long eTime= otherInfoNode.optLong(Constants.FINISH_TIME);
    userName = otherInfoNode.optString(Constants.USER);
    if (eTime < sTime) {
      LOG.warn("DAG has got wrong start/end values. "
          + "startTime=" + sTime + ", endTime=" + eTime + ". Will check "
          + "timestamps in DAG started/finished events");

      // Check if events DAG_STARTED, DAG_FINISHED can be made use of
      for(Event event : eventList) {
        switch (HistoryEventType.valueOf(event.getType())) {
        case DAG_STARTED:
          sTime = event.getAbsoluteTime();
          break;
        case DAG_FINISHED:
          eTime = event.getAbsoluteTime();
          break;
        default:
          break;
        }
      }

      if (eTime < sTime) {
        LOG.warn("DAG has got wrong start/end values in events as well. "
            + "startTime=" + sTime + ", endTime=" + eTime);
      }
    }
    startTime = sTime;
    endTime = eTime;

    //TODO: Not getting populated correctly for lots of jobs.  Verify
    submitTime = otherInfoNode.optLong(Constants.START_REQUESTED_TIME);
    diagnostics = otherInfoNode.optString(Constants.DIAGNOSTICS);
    failedTasks = otherInfoNode.optInt(Constants.NUM_FAILED_TASKS);
    JSONObject dagPlan = otherInfoNode.optJSONObject(Constants.DAG_PLAN);
    name = StringInterner.weakIntern((dagPlan != null) ? (dagPlan.optString(Constants.DAG_NAME)) : null);
    if (dagPlan != null) {
      JSONArray vertices = dagPlan.optJSONArray(Constants.VERTICES);
      if (vertices != null) {
        numVertices = vertices.length();
      } else {
        numVertices = 0;
      }
      parseDAGPlan(dagPlan);
    } else {
      numVertices = 0;
    }
    status = StringInterner.weakIntern(otherInfoNode.optString(Constants.STATUS));

    //parse name id mapping
    JSONObject vertexIDMappingJson = otherInfoNode.optJSONObject(Constants.VERTEX_NAME_ID_MAPPING);
    if (vertexIDMappingJson != null) {
      //get vertex name
      for (Map.Entry<String, BasicVertexInfo> entry : basicVertexInfoMap.entrySet()) {
        String vertexId = vertexIDMappingJson.optString(entry.getKey());
        //vertexName --> vertexId
        vertexNameIDMapping.put(entry.getKey(), vertexId);
      }
    }
  }

  public static DagInfo create(JSONObject jsonObject) throws JSONException {
    DagInfo dagInfo = new DagInfo(jsonObject);
    return dagInfo;
  }

  private void parseDAGPlan(JSONObject dagPlan) throws JSONException {
    int version = dagPlan.optInt(Constants.VERSION, 1);
    parseEdges(dagPlan.optJSONArray(Constants.EDGES));

    JSONArray verticesInfo = dagPlan.optJSONArray(Constants.VERTICES);
    parseBasicVertexInfo(verticesInfo);

    if (version > 1) {
      parseDAGContext(dagPlan.optJSONObject(Constants.DAG_CONTEXT));
    }
  }

  private void parseDAGContext(JSONObject callerContextInfo) {
    if (callerContextInfo == null) {
      LOG.info("No DAG Caller Context available");
      return;
    }
    String context = callerContextInfo.optString(Constants.CONTEXT);
    String callerId = callerContextInfo.optString(Constants.CALLER_ID);
    String callerType = callerContextInfo.optString(Constants.CALLER_TYPE);
    String description = callerContextInfo.optString(Constants.DESCRIPTION);

    this.callerContext = CallerContext.create(context, description);
    if (callerId != null && !callerId.isEmpty() && callerType != null && !callerType.isEmpty()) {
      this.callerContext.setCallerIdAndType(callerId, callerType);
    } else {
      LOG.info("No DAG Caller Context Id and Type available");
    }

  }

  private void parseBasicVertexInfo(JSONArray verticesInfo) throws JSONException {
    if (verticesInfo == null) {
      LOG.info("No vertices available.");
      return;
    }

    //Parse basic information available in DAG for vertex and edges
    for (int i = 0; i < verticesInfo.length(); i++) {
      BasicVertexInfo basicVertexInfo = new BasicVertexInfo();

      JSONObject vJson = verticesInfo.getJSONObject(i);
      basicVertexInfo.vertexName =
          vJson.optString(Constants.VERTEX_NAME);
      JSONArray inEdges = vJson.optJSONArray(Constants.IN_EDGE_IDS);
      if (inEdges != null) {
        String[] inEdgeIds = new String[inEdges.length()];
        for (int j = 0; j < inEdges.length(); j++) {
          inEdgeIds[j] = inEdges.get(j).toString();
        }
        basicVertexInfo.inEdgeIds = inEdgeIds;
      }

      JSONArray outEdges = vJson.optJSONArray(Constants.OUT_EDGE_IDS);
      if (outEdges != null) {
        String[] outEdgeIds = new String[outEdges.length()];
        for (int j = 0; j < outEdges.length(); j++) {
          outEdgeIds[j] = outEdges.get(j).toString();
        }
        basicVertexInfo.outEdgeIds = outEdgeIds;
      }

      JSONArray addInputsJson =
          vJson.optJSONArray(Constants.ADDITIONAL_INPUTS);
      basicVertexInfo.additionalInputs = parseAdditionalDetailsForVertex(addInputsJson);

      JSONArray addOutputsJson =
          vJson.optJSONArray(Constants.ADDITIONAL_OUTPUTS);
      basicVertexInfo.additionalOutputs = parseAdditionalDetailsForVertex(addOutputsJson);

      basicVertexInfoMap.put(basicVertexInfo.vertexName, basicVertexInfo);
    }
  }

  /**
   * get additional details available for every vertex in the dag
   *
   * @param jsonArray
   * @return AdditionalInputOutputDetails[]
   * @throws JSONException
   */
  private AdditionalInputOutputDetails[] parseAdditionalDetailsForVertex(JSONArray jsonArray) throws
      JSONException {
    if (jsonArray != null) {
      AdditionalInputOutputDetails[]
          additionalInputOutputDetails = new AdditionalInputOutputDetails[jsonArray.length()];
      for (int j = 0; j < jsonArray.length(); j++) {
        String name = jsonArray.getJSONObject(j).optString(
            Constants.NAME);
        String clazz = jsonArray.getJSONObject(j).optString(
            Constants.CLASS);
        String initializer =
            jsonArray.getJSONObject(j).optString(Constants.INITIALIZER);
        String userPayloadText = jsonArray.getJSONObject(j).optString(
            Constants.USER_PAYLOAD_TEXT);

        additionalInputOutputDetails[j] =
            new AdditionalInputOutputDetails(name, clazz, initializer, userPayloadText);

      }
      return additionalInputOutputDetails;
    }
    return null;
  }

  /**
   * Parse edge details in the DAG
   *
   * @param edgesArray
   *
   * @throws JSONException
   */
  private void parseEdges(JSONArray edgesArray) throws JSONException {
    if (edgesArray == null) {
      return;
    }
    for (int i = 0; i < edgesArray.length(); i++) {
      JSONObject edge = edgesArray.getJSONObject(i);
      Integer edgeId = edge.optInt(Constants.EDGE_ID);
      String inputVertexName =
          edge.optString(Constants.INPUT_VERTEX_NAME);
      String outputVertexName =
          edge.optString(Constants.OUTPUT_VERTEX_NAME);
      String dataMovementType =
          edge.optString(Constants.DATA_MOVEMENT_TYPE);
      String edgeSourceClass =
          edge.optString(Constants.EDGE_SOURCE_CLASS);
      String edgeDestinationClass =
          edge.optString(Constants.EDGE_DESTINATION_CLASS);
      String inputUserPayloadAsText =
          edge.optString(Constants.INPUT_PAYLOAD_TEXT);
      String outputUserPayloadAsText =
          edge.optString(Constants.OUTPUT_PAYLOAD_TEXT);
      EdgeInfo edgeInfo = new EdgeInfo(inputVertexName, outputVertexName,
          dataMovementType, edgeSourceClass, edgeDestinationClass, inputUserPayloadAsText,
          outputUserPayloadAsText);
      edgeInfoMap.put(edgeId, edgeInfo);
    }
  }

  static class BasicVertexInfo {
    String vertexName;
    String[] inEdgeIds;
    String[] outEdgeIds;
    AdditionalInputOutputDetails[] additionalInputs;
    AdditionalInputOutputDetails[] additionalOutputs;
  }

  void addVertexInfo(VertexInfo vertexInfo) {
    BasicVertexInfo basicVertexInfo = basicVertexInfoMap.get(vertexInfo.getVertexName());

    Preconditions.checkArgument(basicVertexInfo != null,
        "VerteName " + vertexInfo.getVertexName()
            + " not present in DAG's vertices " + basicVertexInfoMap.entrySet());

    //populate additional information in VertexInfo
    if (basicVertexInfo.additionalInputs != null) {
      vertexInfo.setAdditionalInputInfoList(Arrays.asList(basicVertexInfo.additionalInputs));
    }
    if (basicVertexInfo.additionalOutputs != null) {
      vertexInfo.setAdditionalOutputInfoList(Arrays.asList(basicVertexInfo.additionalOutputs));
    }

    //Populate edge information in vertex
    if (basicVertexInfo.inEdgeIds != null) {
      for (String edge : basicVertexInfo.inEdgeIds) {
        EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge));
        Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG");
        vertexInfo.addInEdge(edgeInfo);
      }
    }

    if (basicVertexInfo.outEdgeIds != null) {
      for (String edge : basicVertexInfo.outEdgeIds) {
        EdgeInfo edgeInfo = edgeInfoMap.get(Integer.parseInt(edge));
        Preconditions.checkState(edgeInfo != null, "EdgeId " + edge + " not present in DAG");
        vertexInfo.addOutEdge(edgeInfo);
      }
    }

    vertexNameMap.put(vertexInfo.getVertexName(), vertexInfo);
  }

  void setAppConfig(Map<String, String> config) {
    this.config = config;
  }

  public Map<String, String> getAppConfig() {
    return (config != null) ? Collections.unmodifiableMap(config) : null;
  }

  void setVersionInfo(VersionInfo versionInfo) {
    this.versionInfo = versionInfo;
  }

  void addContainerMapping(Container container, TaskAttemptInfo taskAttemptInfo) {
    this.containerMapping.put(container, taskAttemptInfo);
  }

  @Override
  public String toString() {
    StringBuilder sb = new StringBuilder();
    sb.append("[");
    sb.append("dagID=").append(getDagId()).append(", ");
    sb.append("dagName=").append(getName()).append(", ");
    sb.append("status=").append(getStatus()).append(", ");
    sb.append("startTime=").append(getStartTimeInterval()).append(", ");
    sb.append("submitTime=").append(getSubmitTime()).append(", ");
    sb.append("endTime=").append(getFinishTimeInterval()).append(", ");
    sb.append("timeTaken=").append(getTimeTaken()).append(", ");
    sb.append("diagnostics=").append(getDiagnostics()).append(", ");
    sb.append("vertexNameIDMapping=").append(getVertexNameIDMapping()).append(", ");
    sb.append("failedTasks=").append(getFailedTaskCount()).append(", ");
    sb.append("events=").append(getEvents()).append(", ");
    sb.append("status=").append(getStatus());
    sb.append("]");
    return sb.toString();
  }

  public Multimap<Container, TaskAttemptInfo> getContainerMapping() {
    return Multimaps.unmodifiableMultimap(containerMapping);
  }

  public final VersionInfo getVersionInfo() {
    return versionInfo;
  }

  public final CallerContext getCallerContext() {
    return callerContext;
  }

  public final String getName() {
    return name;
  }

  public final Collection<EdgeInfo> getEdges() {
    return Collections.unmodifiableCollection(edgeInfoMap.values());
  }

  public final long getSubmitTime() {
    return submitTime;
  }

  public final long getStartTime() {
    return startTime;
  }

  public final long getFinishTime() {
    return endTime;
  }

  /**
   * Reference start time for the DAG. Vertex, Task, TaskAttempt would map on to this.
   * If absolute start time is needed, call getAbsStartTime().
   *
   * @return starting time w.r.t to dag
   */
  public final long getStartTimeInterval() {
    return 0;
  }

  @Override
  public final long getFinishTimeInterval() {
    long dagEndTime = (endTime - startTime);
    if (dagEndTime < 0) {
      //probably dag is not complete or failed in middle. get the last task attempt time
      for (VertexInfo vertexInfo : getVertices()) {
        dagEndTime = (vertexInfo.getFinishTimeInterval() > dagEndTime) ? vertexInfo.getFinishTimeInterval() : dagEndTime;
      }
    }
    return dagEndTime;
  }

  public final long getTimeTaken() {
    return getFinishTimeInterval();
  }

  public final String getStatus() {
    return status;
  }

  /**
   * Get vertexInfo for a given vertexid
   *
   * @param vertexId
   * @return VertexInfo
   */
  public VertexInfo getVertexFromId(String vertexId) {
    return vertexNameMap.get(vertexNameIDMapping.getKey(vertexId));
  }

  /**
   * Get vertexInfo for a given vertex name
   *
   * @param vertexName
   * @return VertexInfo
   */
  public final VertexInfo getVertex(String vertexName) {
    return vertexNameMap.get(vertexName);
  }

  public final String getDiagnostics() {
    return diagnostics;
  }

  /**
   * Get all vertices
   *
   * @return List<VertexInfo>
   */
  public final List<VertexInfo> getVertices() {
    List<VertexInfo> vertices = Lists.newLinkedList(vertexNameMap.values());
    Collections.sort(vertices, new Comparator<VertexInfo>() {

      @Override public int compare(VertexInfo o1, VertexInfo o2) {
        return (o1.getStartTimeInterval() < o2.getStartTimeInterval()) ? -1 :
            ((o1.getStartTimeInterval() == o2.getStartTimeInterval()) ?
                0 : 1);
      }
    });
    return Collections.unmodifiableList(vertices);
  }

  /**
   * Get list of failed vertices
   *
   * @return List<VertexInfo>
   */
  public final List<VertexInfo> getFailedVertices() {
    return getVertices(VertexState.FAILED);
  }

  /**
   * Get list of killed vertices
   *
   * @return List<VertexInfo>
   */
  public final List<VertexInfo> getKilledVertices() {
    return getVertices(VertexState.KILLED);
  }

  /**
   * Get list of failed vertices
   *
   * @return List<VertexInfo>
   */
  public final List<VertexInfo> getSuccessfullVertices() {
    return getVertices(VertexState.SUCCEEDED);
  }

  /**
   * Get list of vertices belonging to a specific state
   *
   * @param state
   * @return Collection<VertexInfo>
   */
  public final List<VertexInfo> getVertices(final VertexState state) {
    return Collections.unmodifiableList(Lists.newLinkedList(Iterables.filter(Lists.newLinkedList
                    (vertexNameMap.values()), new Predicate<VertexInfo>() {
                  @Override public boolean apply(VertexInfo input) {
                    return input.getStatus() != null && input.getStatus().equals(state.toString());
                  }
                }
            )
        )
    );
  }

  public final Map<String, VertexInfo> getVertexMapping() {
    return Collections.unmodifiableMap(vertexNameMap);
  }

  private Ordering<VertexInfo> getVertexOrdering() {
    return Ordering.from(new Comparator<VertexInfo>() {
      @Override public int compare(VertexInfo o1, VertexInfo o2) {
        return (o1.getTimeTaken() < o2.getTimeTaken()) ? -1 :
            ((o1.getTimeTaken() == o2.getTimeTaken()) ?
                0 : 1);
      }
    });
  }

  /**
   * Get the slowest vertex in the DAG
   *
   * @return VertexInfo
   */
  public final VertexInfo getSlowestVertex() {
    List<VertexInfo> vertexInfoList = getVertices();
    if (vertexInfoList.size() == 0) {
      return null;
    }
    return getVertexOrdering().max(vertexInfoList);
  }

  /**
   * Get the slowest vertex in the DAG
   *
   * @return VertexInfo
   */
  public final VertexInfo getFastestVertex() {
    List<VertexInfo> vertexInfoList = getVertices();
    if (vertexInfoList.size() == 0) {
      return null;
    }
    return getVertexOrdering().min(vertexInfoList);
  }

  /**
   * Get node details for this DAG. Would be useful for analyzing node to tasks.
   *
   * @return Multimap<String, TaskAttemptInfo> taskAttempt details at every node
   */
  public final Multimap<String, TaskAttemptInfo> getNodeDetails() {
    Multimap<String, TaskAttemptInfo> nodeDetails = LinkedListMultimap.create();
    for (VertexInfo vertexInfo : getVertices()) {
      Multimap<Container, TaskAttemptInfo> containerMapping = vertexInfo.getContainersMapping();
      for (Map.Entry<Container, TaskAttemptInfo> entry : containerMapping.entries()) {
        nodeDetails.put(entry.getKey().getHost(), entry.getValue());
      }
    }
    return nodeDetails;
  }

  /**
   * Get containers used for this DAG
   *
   * @return Multimap<Container, TaskAttemptInfo> task attempt details at every container
   */
  public final Multimap<Container, TaskAttemptInfo> getContainersToTaskAttemptMapping() {
    List<VertexInfo> VertexInfoList = getVertices();
    Multimap<Container, TaskAttemptInfo> containerMapping = LinkedHashMultimap.create();

    for (VertexInfo vertexInfo : VertexInfoList) {
      containerMapping.putAll(vertexInfo.getContainersMapping());
    }
    return Multimaps.unmodifiableMultimap(containerMapping);
  }

  public final Map<String, String> getVertexNameIDMapping() {
    return vertexNameIDMapping;
  }

  public final int getNumVertices() {
    return numVertices;
  }

  public final String getDagId() {
    return dagId;
  }

  public final int getFailedTaskCount() {
    return failedTasks;
  }

  public final String getUserName() {
    return userName;
  }

  final void setUserName(String userName) {
    this.userName = userName;
  }

}
