TEZ-3958: Add internal vertex priority information into the tez dag.dot debug information (Jaume Marhuenda via Gopal V)
diff --git a/tez-dag/src/main/java/org/apache/tez/Utils.java b/tez-dag/src/main/java/org/apache/tez/Utils.java
index 6f03a67..dbde327 100644
--- a/tez-dag/src/main/java/org/apache/tez/Utils.java
+++ b/tez-dag/src/main/java/org/apache/tez/Utils.java
@@ -14,18 +14,30 @@
 
 package org.apache.tez;
 
+import javax.annotation.Nullable;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.event.Event;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGScheduler;
 import org.apache.tez.dag.app.dag.DAGTerminationCause;
+import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
 import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.utils.Graph;
 import org.apache.tez.serviceplugins.api.DagInfo;
 import org.apache.tez.serviceplugins.api.ServicePluginError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 @InterfaceAudience.Private
 /**
  * Utility class within the tez-dag module
@@ -34,6 +46,11 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
 
+  /**
+   * Pattern to clean the labels in the .dot generation.
+   */
+  private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
+
   public static String getContainerLauncherIdentifierString(int launcherIndex, AppContext appContext) {
     String name;
     try {
@@ -92,6 +109,156 @@
     }
   }
 
+  /**
+   * Generate a visualization file.
+   * @param dag DAG.
+   * @param dagPB DAG plan.
+   * @param scheduler scheduler that provide the priorities of the vertexes.
+   */
+  public static void generateDAGVizFile(final DAG dag,
+      final DAGProtos.DAGPlan dagPB, @Nullable final DAGScheduler scheduler) {
+    generateDAGVizFile(dag, dagPB, TezCommonUtils.getTrimmedStrings(
+        System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())),
+        scheduler);
+  }
+
+  /**
+   * Generate a visualization file.
+   * @param dag DAG.
+   * @param dagPB DAG plan.
+   * @param logDirs directories where the file will be written.
+   * @param scheduler scheduler that will provide the priorities
+   *                  of the vertexes.
+   */
+  public static void generateDAGVizFile(final DAG dag,
+      final DAGProtos.DAGPlan dagPB,
+      final String[] logDirs, final @Nullable DAGScheduler scheduler) {
+    TezDAGID dagId = dag.getID();
+
+    HashMap<String, Vertex> nameToVertex = null;
+    if (scheduler != null) {
+      nameToVertex = new HashMap<>(dag.getVertices().size());
+      for (Vertex v: dag.getVertices().values()) {
+        nameToVertex.put(v.getName(), v);
+      }
+    }
+
+    Graph graph = new Graph(sanitizeLabelForViz(dagPB.getName()));
+    for (DAGProtos.VertexPlan vertexPlan : dagPB.getVertexList()) {
+      StringBuilder nodeLabel = new StringBuilder(
+          sanitizeLabelForViz(vertexPlan.getName())
+          + "[" + getShortClassName(
+              vertexPlan.getProcessorDescriptor().getClassName()));
+
+      if (scheduler != null) {
+        Vertex vertex = nameToVertex.get(vertexPlan.getName());
+        if (vertex != null) {
+          try {
+            int priority = (scheduler.getPriorityLowLimit(dag, vertex)
+                + scheduler.getPriorityHighLimit(dag,vertex)) / 2;
+            nodeLabel.append(", priority=").append(priority).append("]");
+          } catch (UnsupportedOperationException e) {
+            LOG.info("The DAG graphviz file with priorities will not"
+                + " be generate since the scheduler "
+                + scheduler.getClass().getSimpleName() + " doesn't"
+                + " override the methods to get the priorities");
+            return;
+          }
+        }
+      }
+      Graph.Node n = graph.newNode(sanitizeLabelForViz(vertexPlan.getName()),
+          nodeLabel.toString());
+      for (DAGProtos.RootInputLeafOutputProto input
+          : vertexPlan.getInputsList()) {
+        Graph.Node inputNode = graph.getNode(
+            sanitizeLabelForViz(vertexPlan.getName())
+            + "_" + sanitizeLabelForViz(input.getName()));
+        inputNode.setLabel(sanitizeLabelForViz(vertexPlan.getName())
+            + "[" + sanitizeLabelForViz(input.getName()) + "]");
+        inputNode.setShape("box");
+        inputNode.addEdge(n, "Input"
+            + " [inputClass=" + getShortClassName(
+                  input.getIODescriptor().getClassName())
+            + ", initializer=" + getShortClassName(
+                  input.getControllerDescriptor().getClassName()) + "]");
+      }
+      for (DAGProtos.RootInputLeafOutputProto output
+          : vertexPlan.getOutputsList()) {
+        Graph.Node outputNode = graph.getNode(sanitizeLabelForViz(
+                vertexPlan.getName())
+            + "_" + sanitizeLabelForViz(output.getName()));
+        outputNode.setLabel(sanitizeLabelForViz(vertexPlan.getName())
+            + "[" + sanitizeLabelForViz(output.getName()) + "]");
+        outputNode.setShape("box");
+        n.addEdge(outputNode, "Output"
+            + " [outputClass=" + getShortClassName(
+                  output.getIODescriptor().getClassName())
+            + ", committer=" + getShortClassName(
+                  output.getControllerDescriptor().getClassName()) + "]");
+      }
+    }
+
+    for (DAGProtos.EdgePlan e : dagPB.getEdgeList()) {
+
+      Graph.Node n = graph.getNode(sanitizeLabelForViz(
+          e.getInputVertexName()));
+      n.addEdge(graph.getNode(sanitizeLabelForViz(
+          e.getOutputVertexName())),
+          "["
+              + "input=" + getShortClassName(e.getEdgeSource().getClassName())
+              + ", output=" + getShortClassName(
+                    e.getEdgeDestination().getClassName())
+              + ", dataMovement=" + e.getDataMovementType().name().trim()
+              + ", schedulingType="
+              + e.getSchedulingType().name().trim() + "]");
+    }
+
+    String outputFile = "";
+    if (logDirs != null && logDirs.length != 0) {
+      outputFile += logDirs[0];
+      outputFile += File.separator;
+    }
+    outputFile += dagId.toString();
+    // Means we have set the priorities
+    if (scheduler != null) {
+      outputFile += "_priority";
+    }
+    outputFile += ".dot";
+
+    try {
+      LOG.info("Generating DAG graphviz file"
+           + ", dagId=" + dagId.toString()
+           + ", filePath=" + outputFile);
+      graph.save(outputFile);
+    } catch (Exception e) {
+      LOG.warn("Error occurred when trying to save graph structure"
+          + " for dag " + dagId.toString(), e);
+    }
+  }
+
+  /**
+   * Get the short name of the class.
+   * @param className long name
+   * @return short name
+   */
+  private static String getShortClassName(final String className) {
+    int pos = className.lastIndexOf(".");
+    if (pos != -1 && pos < className.length() - 1) {
+      return className.substring(pos + 1);
+    }
+    return className;
+  }
+
+  /**
+   * Replace some characters with underscores.
+   * @param label label to sanitize
+   * @return the label with the replaced characters
+   */
+  private static String sanitizeLabelForViz(final String label) {
+    Matcher m = sanitizeLabelPattern.matcher(label);
+    return m.replaceAll("_");
+  }
+
   @SuppressWarnings("unchecked")
   private static void sendEvent(AppContext appContext, Event<?> event) {
     appContext.getEventHandler().handle(event);
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c4b8df0..42a9d57 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -55,8 +55,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
@@ -66,6 +64,7 @@
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.Options;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.Utils;
 import org.apache.tez.client.CallerContext;
 import org.apache.tez.client.TezClientUtils;
 import org.apache.tez.common.TezUtils;
@@ -131,7 +130,6 @@
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.DAGClientHandler;
 import org.apache.tez.dag.api.client.DAGClientServer;
-import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
@@ -179,7 +177,6 @@
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.utils.Graph;
 import org.apache.tez.dag.utils.RelocalizationUtils;
 import org.apache.tez.dag.utils.Simple2LevelVersionComparator;
 import org.apache.tez.hadoop.shim.HadoopShim;
@@ -227,8 +224,6 @@
   public static final int SHUTDOWN_HOOK_PRIORITY = 30;
   private static final Joiner PATH_JOINER = Joiner.on('/');
 
-  private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
-
   @VisibleForTesting
   static final String INVALID_SESSION_ERR_MSG = "Initial application attempt in session mode failed. "
       + "Application cannot recover and continue properly as DAG recovery has been disabled";
@@ -1038,82 +1033,11 @@
       LOG.warn("Failed to generate json for DAG", e);
     }
 
-    generateDAGVizFile(dagId, dagPB, logDirs);
+    Utils.generateDAGVizFile(newDag, dagPB, logDirs, newDag.getDAGScheduler());
     writePBTextFile(newDag);
     return newDag;
   } // end createDag()
 
-  String getShortClassName(String className) {
-    int pos = className.lastIndexOf(".");
-    if (pos != -1 && pos < className.length()-1) {
-      return className.substring(pos+1);
-    }
-    return className;
-  }
-
-
-  private String sanitizeLabelForViz(String label) {
-    Matcher m = sanitizeLabelPattern.matcher(label);
-    return m.replaceAll("_");
-  }
-
-  private void generateDAGVizFile(TezDAGID dagId, DAGPlan dagPB, String[] logDirs) {
-    Graph graph = new Graph(sanitizeLabelForViz(dagPB.getName()));
-
-    for (VertexPlan v : dagPB.getVertexList()) {
-      String nodeLabel = sanitizeLabelForViz(v.getName())
-          + "[" + getShortClassName(v.getProcessorDescriptor().getClassName() + "]");
-      Graph.Node n = graph.newNode(sanitizeLabelForViz(v.getName()), nodeLabel);
-      for (DAGProtos.RootInputLeafOutputProto input : v.getInputsList()) {
-        Graph.Node inputNode = graph.getNode(sanitizeLabelForViz(v.getName())
-            + "_" + sanitizeLabelForViz(input.getName()));
-        inputNode.setLabel(sanitizeLabelForViz(v.getName())
-            + "[" + sanitizeLabelForViz(input.getName()) + "]");
-        inputNode.setShape("box");
-        inputNode.addEdge(n, "Input"
-            + " [inputClass=" + getShortClassName(input.getIODescriptor().getClassName())
-            + ", initializer=" + getShortClassName(input.getControllerDescriptor().getClassName()) + "]");
-      }
-      for (DAGProtos.RootInputLeafOutputProto output : v.getOutputsList()) {
-        Graph.Node outputNode = graph.getNode(sanitizeLabelForViz(v.getName())
-            + "_" + sanitizeLabelForViz(output.getName()));
-        outputNode.setLabel(sanitizeLabelForViz(v.getName())
-            + "[" + sanitizeLabelForViz(output.getName()) + "]");
-        outputNode.setShape("box");
-        n.addEdge(outputNode, "Output"
-            + " [outputClass=" + getShortClassName(output.getIODescriptor().getClassName())
-            + ", committer=" + getShortClassName(output.getControllerDescriptor().getClassName()) + "]");
-      }
-    }
-
-    for (DAGProtos.EdgePlan e : dagPB.getEdgeList()) {
-
-      Graph.Node n = graph.getNode(sanitizeLabelForViz(e.getInputVertexName()));
-      n.addEdge(graph.getNode(sanitizeLabelForViz(e.getOutputVertexName())),
-          "["
-          + "input=" + getShortClassName(e.getEdgeSource().getClassName())
-          + ", output=" + getShortClassName(e.getEdgeDestination().getClassName())
-          + ", dataMovement=" + e.getDataMovementType().name().trim()
-          + ", schedulingType=" + e.getSchedulingType().name().trim() + "]");
-    }
-
-    String outputFile = "";
-    if (logDirs != null && logDirs.length != 0) {
-      outputFile += logDirs[0];
-      outputFile += File.separator;
-    }
-    outputFile += dagId.toString() + ".dot";
-
-    try {
-      LOG.info("Generating DAG graphviz file"
-          + ", dagId=" + dagId.toString()
-          + ", filePath=" + outputFile);
-      graph.save(outputFile);
-    } catch (Exception e) {
-      LOG.warn("Error occurred when trying to save graph structure"
-          + " for dag " + dagId.toString(), e);
-    }
-  }
 
   private void writePBTextFile(DAG dag) {
     if (dag.getConf().getBoolean(TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS,
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 10c4257..5c2eba1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -38,6 +38,8 @@
 import org.apache.tez.dag.records.TezVertexID;
 import org.apache.tez.serviceplugins.api.DagInfo;
 
+import javax.annotation.Nullable;
+
 /**
  * Main interface to interact with the job.
  */
@@ -97,4 +99,11 @@
 
   org.apache.tez.dag.api.Vertex.VertexExecutionContext getDefaultExecutionContext();
 
+  /**
+   *
+   * @return the DAGScheduler that will schedule
+   * this DAG, null if it doesn't exist
+   */
+  @Nullable DAGScheduler getDAGScheduler();
+
 }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
index 3055cd3..2fa735e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
@@ -89,4 +89,26 @@
   public abstract void scheduleTaskEx(DAGEventSchedulerUpdate event);
   
   public abstract void taskCompletedEx(DAGEventSchedulerUpdate event);
+
+  /**
+   * Get the low limit priority for a particular vertex.
+   * @param vertex to get the priority of
+   * @return the priority
+   */
+  public int getPriorityLowLimit(final DAG dag, final Vertex vertex) {
+    final int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
+    return ((vertexDistanceFromRoot + 1) * dag.getTotalVertices() * 3)
+        + (vertex.getVertexId().getId() * 3);
+  }
+
+  /**
+   * Get the low hight priority for a particular vertex. Default
+   * to the low limit priority minus two.
+   * @param vertex to get the priority of
+   * @return the priority
+   */
+  public int getPriorityHighLimit(final DAG dag, final Vertex vertex) {
+    return  getPriorityLowLimit(dag, vertex) - 2;
+  }
+
 }
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index bd5e0ff..6dcc7f0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -42,6 +42,7 @@
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.Utils;
 import org.apache.tez.common.TezUtilsInternal;
 import org.apache.tez.common.counters.LimitExceededException;
 import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
@@ -1620,6 +1621,9 @@
       }
     }
 
+    // This is going to override the previously generated file
+    // which didn't have the priorities
+    Utils.generateDAGVizFile(this, jobPlan, dagScheduler);
     return DAGState.INITED;
   }
 
@@ -2382,6 +2386,11 @@
     }
   }
 
+  @Override
+  public DAGScheduler getDAGScheduler() {
+    return dagScheduler;
+  }
+
   // output of either vertex or vertex group
   public static class OutputKey {
     String outputName;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
index 3a16f46..2383db8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
@@ -46,11 +46,10 @@
   public void scheduleTaskEx(DAGEventSchedulerUpdate event) {
     TaskAttempt attempt = event.getAttempt();
     Vertex vertex = dag.getVertex(attempt.getVertexID());
-    int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
 
     // natural priority. Handles failures and retries.
-    int priorityLowLimit = ((vertexDistanceFromRoot + 1) * dag.getTotalVertices() * 3) + (vertex.getVertexId().getId() * 3);
-    int priorityHighLimit = priorityLowLimit - 2;
+    int priorityLowLimit = getPriorityLowLimit(dag, vertex);
+    int priorityHighLimit = getPriorityHighLimit(dag, vertex);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Scheduling " + attempt.getID() + " between priorityLow: " + priorityLowLimit
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
index 34cc92f..c51783b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
@@ -76,11 +76,10 @@
   public void scheduleTaskEx(DAGEventSchedulerUpdate event) {
     TaskAttempt attempt = event.getAttempt();
     Vertex vertex = dag.getVertex(attempt.getVertexID());
-    int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
 
     // natural priority. Handles failures and retries.
-    int priorityLowLimit = ((vertexDistanceFromRoot + 1) * dag.getTotalVertices() * 3) + (vertex.getVertexId().getId() * 3);
-    int priorityHighLimit = priorityLowLimit - 2;
+    int priorityLowLimit = getPriorityLowLimit(dag, vertex);
+    int priorityHighLimit = getPriorityHighLimit(dag, vertex);
 
     TaskAttemptEventSchedule attemptEvent = new TaskAttemptEventSchedule(
         attempt.getID(), priorityLowLimit, priorityHighLimit);
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
index f38f689..07c361a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -58,14 +58,19 @@
     TaskAttempt mockAttempt = mock(TaskAttempt.class);
     when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex);
     when(mockDag.getTotalVertices()).thenReturn(4);
-    when(mockVertex.getDistanceFromRoot()).thenReturn(0).thenReturn(1)
-        .thenReturn(2);
+    when(mockVertex.getDistanceFromRoot())
+        .thenReturn(0).thenReturn(0)
+        .thenReturn(1).thenReturn(1)
+        .thenReturn(2).thenReturn(2);
     TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00");
     TezVertexID vId1 = TezVertexID.fromString("vertex_1436907267600_195589_1_01");
     TezVertexID vId2 = TezVertexID.fromString("vertex_1436907267600_195589_1_02");
     TezVertexID vId3 = TezVertexID.fromString("vertex_1436907267600_195589_1_03");
-    when(mockVertex.getVertexId()).thenReturn(vId0).thenReturn(vId1)
-        .thenReturn(vId2).thenReturn(vId3);
+    when(mockVertex.getVertexId())
+        .thenReturn(vId0).thenReturn(vId0)
+        .thenReturn(vId1).thenReturn(vId1)
+        .thenReturn(vId2).thenReturn(vId2)
+        .thenReturn(vId3).thenReturn(vId3);
     
     DAGEventSchedulerUpdate event = new DAGEventSchedulerUpdate(
         DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt);