TEZ-3958: Add internal vertex priority information into the tez dag.dot debug information (Jaume Marhuenda via Gopal V)
diff --git a/tez-dag/src/main/java/org/apache/tez/Utils.java b/tez-dag/src/main/java/org/apache/tez/Utils.java
index 6f03a67..dbde327 100644
--- a/tez-dag/src/main/java/org/apache/tez/Utils.java
+++ b/tez-dag/src/main/java/org/apache/tez/Utils.java
@@ -14,18 +14,30 @@
package org.apache.tez;
+import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.event.Event;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.dag.DAG;
+import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
+import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.utils.Graph;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.util.HashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
@InterfaceAudience.Private
/**
* Utility class within the tez-dag module
@@ -34,6 +46,11 @@
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+ /**
+ * Pattern to clean the labels in the .dot generation.
+ */
+ private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
+
public static String getContainerLauncherIdentifierString(int launcherIndex, AppContext appContext) {
String name;
try {
@@ -92,6 +109,156 @@
}
}
+ /**
+ * Generate a visualization file.
+ * @param dag DAG.
+ * @param dagPB DAG plan.
+ * @param scheduler scheduler that provide the priorities of the vertexes.
+ */
+ public static void generateDAGVizFile(final DAG dag,
+ final DAGProtos.DAGPlan dagPB, @Nullable final DAGScheduler scheduler) {
+ generateDAGVizFile(dag, dagPB, TezCommonUtils.getTrimmedStrings(
+ System.getenv(ApplicationConstants.Environment.LOG_DIRS.name())),
+ scheduler);
+ }
+
+ /**
+ * Generate a visualization file.
+ * @param dag DAG.
+ * @param dagPB DAG plan.
+ * @param logDirs directories where the file will be written.
+ * @param scheduler scheduler that will provide the priorities
+ * of the vertexes.
+ */
+ public static void generateDAGVizFile(final DAG dag,
+ final DAGProtos.DAGPlan dagPB,
+ final String[] logDirs, final @Nullable DAGScheduler scheduler) {
+ TezDAGID dagId = dag.getID();
+
+ HashMap<String, Vertex> nameToVertex = null;
+ if (scheduler != null) {
+ nameToVertex = new HashMap<>(dag.getVertices().size());
+ for (Vertex v: dag.getVertices().values()) {
+ nameToVertex.put(v.getName(), v);
+ }
+ }
+
+ Graph graph = new Graph(sanitizeLabelForViz(dagPB.getName()));
+ for (DAGProtos.VertexPlan vertexPlan : dagPB.getVertexList()) {
+ StringBuilder nodeLabel = new StringBuilder(
+ sanitizeLabelForViz(vertexPlan.getName())
+ + "[" + getShortClassName(
+ vertexPlan.getProcessorDescriptor().getClassName()));
+
+ if (scheduler != null) {
+ Vertex vertex = nameToVertex.get(vertexPlan.getName());
+ if (vertex != null) {
+ try {
+ int priority = (scheduler.getPriorityLowLimit(dag, vertex)
+ + scheduler.getPriorityHighLimit(dag,vertex)) / 2;
+ nodeLabel.append(", priority=").append(priority).append("]");
+ } catch (UnsupportedOperationException e) {
+ LOG.info("The DAG graphviz file with priorities will not"
+ + " be generate since the scheduler "
+ + scheduler.getClass().getSimpleName() + " doesn't"
+ + " override the methods to get the priorities");
+ return;
+ }
+ }
+ }
+ Graph.Node n = graph.newNode(sanitizeLabelForViz(vertexPlan.getName()),
+ nodeLabel.toString());
+ for (DAGProtos.RootInputLeafOutputProto input
+ : vertexPlan.getInputsList()) {
+ Graph.Node inputNode = graph.getNode(
+ sanitizeLabelForViz(vertexPlan.getName())
+ + "_" + sanitizeLabelForViz(input.getName()));
+ inputNode.setLabel(sanitizeLabelForViz(vertexPlan.getName())
+ + "[" + sanitizeLabelForViz(input.getName()) + "]");
+ inputNode.setShape("box");
+ inputNode.addEdge(n, "Input"
+ + " [inputClass=" + getShortClassName(
+ input.getIODescriptor().getClassName())
+ + ", initializer=" + getShortClassName(
+ input.getControllerDescriptor().getClassName()) + "]");
+ }
+ for (DAGProtos.RootInputLeafOutputProto output
+ : vertexPlan.getOutputsList()) {
+ Graph.Node outputNode = graph.getNode(sanitizeLabelForViz(
+ vertexPlan.getName())
+ + "_" + sanitizeLabelForViz(output.getName()));
+ outputNode.setLabel(sanitizeLabelForViz(vertexPlan.getName())
+ + "[" + sanitizeLabelForViz(output.getName()) + "]");
+ outputNode.setShape("box");
+ n.addEdge(outputNode, "Output"
+ + " [outputClass=" + getShortClassName(
+ output.getIODescriptor().getClassName())
+ + ", committer=" + getShortClassName(
+ output.getControllerDescriptor().getClassName()) + "]");
+ }
+ }
+
+ for (DAGProtos.EdgePlan e : dagPB.getEdgeList()) {
+
+ Graph.Node n = graph.getNode(sanitizeLabelForViz(
+ e.getInputVertexName()));
+ n.addEdge(graph.getNode(sanitizeLabelForViz(
+ e.getOutputVertexName())),
+ "["
+ + "input=" + getShortClassName(e.getEdgeSource().getClassName())
+ + ", output=" + getShortClassName(
+ e.getEdgeDestination().getClassName())
+ + ", dataMovement=" + e.getDataMovementType().name().trim()
+ + ", schedulingType="
+ + e.getSchedulingType().name().trim() + "]");
+ }
+
+ String outputFile = "";
+ if (logDirs != null && logDirs.length != 0) {
+ outputFile += logDirs[0];
+ outputFile += File.separator;
+ }
+ outputFile += dagId.toString();
+ // Means we have set the priorities
+ if (scheduler != null) {
+ outputFile += "_priority";
+ }
+ outputFile += ".dot";
+
+ try {
+ LOG.info("Generating DAG graphviz file"
+ + ", dagId=" + dagId.toString()
+ + ", filePath=" + outputFile);
+ graph.save(outputFile);
+ } catch (Exception e) {
+ LOG.warn("Error occurred when trying to save graph structure"
+ + " for dag " + dagId.toString(), e);
+ }
+ }
+
+ /**
+ * Get the short name of the class.
+ * @param className long name
+ * @return short name
+ */
+ private static String getShortClassName(final String className) {
+ int pos = className.lastIndexOf(".");
+ if (pos != -1 && pos < className.length() - 1) {
+ return className.substring(pos + 1);
+ }
+ return className;
+ }
+
+ /**
+ * Replace some characters with underscores.
+ * @param label label to sanitize
+ * @return the label with the replaced characters
+ */
+ private static String sanitizeLabelForViz(final String label) {
+ Matcher m = sanitizeLabelPattern.matcher(label);
+ return m.replaceAll("_");
+ }
+
@SuppressWarnings("unchecked")
private static void sendEvent(AppContext appContext, Event<?> event) {
appContext.getEventHandler().handle(event);
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index c4b8df0..42a9d57 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -55,8 +55,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
@@ -66,6 +64,7 @@
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.Utils;
import org.apache.tez.client.CallerContext;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.TezUtils;
@@ -131,7 +130,6 @@
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.client.DAGClientHandler;
import org.apache.tez.dag.api.client.DAGClientServer;
-import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourcesProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
@@ -179,7 +177,6 @@
import org.apache.tez.dag.history.utils.DAGUtils;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezVertexID;
-import org.apache.tez.dag.utils.Graph;
import org.apache.tez.dag.utils.RelocalizationUtils;
import org.apache.tez.dag.utils.Simple2LevelVersionComparator;
import org.apache.tez.hadoop.shim.HadoopShim;
@@ -227,8 +224,6 @@
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Joiner PATH_JOINER = Joiner.on('/');
- private static Pattern sanitizeLabelPattern = Pattern.compile("[:\\-\\W]+");
-
@VisibleForTesting
static final String INVALID_SESSION_ERR_MSG = "Initial application attempt in session mode failed. "
+ "Application cannot recover and continue properly as DAG recovery has been disabled";
@@ -1038,82 +1033,11 @@
LOG.warn("Failed to generate json for DAG", e);
}
- generateDAGVizFile(dagId, dagPB, logDirs);
+ Utils.generateDAGVizFile(newDag, dagPB, logDirs, newDag.getDAGScheduler());
writePBTextFile(newDag);
return newDag;
} // end createDag()
- String getShortClassName(String className) {
- int pos = className.lastIndexOf(".");
- if (pos != -1 && pos < className.length()-1) {
- return className.substring(pos+1);
- }
- return className;
- }
-
-
- private String sanitizeLabelForViz(String label) {
- Matcher m = sanitizeLabelPattern.matcher(label);
- return m.replaceAll("_");
- }
-
- private void generateDAGVizFile(TezDAGID dagId, DAGPlan dagPB, String[] logDirs) {
- Graph graph = new Graph(sanitizeLabelForViz(dagPB.getName()));
-
- for (VertexPlan v : dagPB.getVertexList()) {
- String nodeLabel = sanitizeLabelForViz(v.getName())
- + "[" + getShortClassName(v.getProcessorDescriptor().getClassName() + "]");
- Graph.Node n = graph.newNode(sanitizeLabelForViz(v.getName()), nodeLabel);
- for (DAGProtos.RootInputLeafOutputProto input : v.getInputsList()) {
- Graph.Node inputNode = graph.getNode(sanitizeLabelForViz(v.getName())
- + "_" + sanitizeLabelForViz(input.getName()));
- inputNode.setLabel(sanitizeLabelForViz(v.getName())
- + "[" + sanitizeLabelForViz(input.getName()) + "]");
- inputNode.setShape("box");
- inputNode.addEdge(n, "Input"
- + " [inputClass=" + getShortClassName(input.getIODescriptor().getClassName())
- + ", initializer=" + getShortClassName(input.getControllerDescriptor().getClassName()) + "]");
- }
- for (DAGProtos.RootInputLeafOutputProto output : v.getOutputsList()) {
- Graph.Node outputNode = graph.getNode(sanitizeLabelForViz(v.getName())
- + "_" + sanitizeLabelForViz(output.getName()));
- outputNode.setLabel(sanitizeLabelForViz(v.getName())
- + "[" + sanitizeLabelForViz(output.getName()) + "]");
- outputNode.setShape("box");
- n.addEdge(outputNode, "Output"
- + " [outputClass=" + getShortClassName(output.getIODescriptor().getClassName())
- + ", committer=" + getShortClassName(output.getControllerDescriptor().getClassName()) + "]");
- }
- }
-
- for (DAGProtos.EdgePlan e : dagPB.getEdgeList()) {
-
- Graph.Node n = graph.getNode(sanitizeLabelForViz(e.getInputVertexName()));
- n.addEdge(graph.getNode(sanitizeLabelForViz(e.getOutputVertexName())),
- "["
- + "input=" + getShortClassName(e.getEdgeSource().getClassName())
- + ", output=" + getShortClassName(e.getEdgeDestination().getClassName())
- + ", dataMovement=" + e.getDataMovementType().name().trim()
- + ", schedulingType=" + e.getSchedulingType().name().trim() + "]");
- }
-
- String outputFile = "";
- if (logDirs != null && logDirs.length != 0) {
- outputFile += logDirs[0];
- outputFile += File.separator;
- }
- outputFile += dagId.toString() + ".dot";
-
- try {
- LOG.info("Generating DAG graphviz file"
- + ", dagId=" + dagId.toString()
- + ", filePath=" + outputFile);
- graph.save(outputFile);
- } catch (Exception e) {
- LOG.warn("Error occurred when trying to save graph structure"
- + " for dag " + dagId.toString(), e);
- }
- }
private void writePBTextFile(DAG dag) {
if (dag.getConf().getBoolean(TezConfiguration.TEZ_GENERATE_DEBUG_ARTIFACTS,
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
index 10c4257..5c2eba1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAG.java
@@ -38,6 +38,8 @@
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.serviceplugins.api.DagInfo;
+import javax.annotation.Nullable;
+
/**
* Main interface to interact with the job.
*/
@@ -97,4 +99,11 @@
org.apache.tez.dag.api.Vertex.VertexExecutionContext getDefaultExecutionContext();
+ /**
+ *
+ * @return the DAGScheduler that will schedule
+ * this DAG, null if it doesn't exist
+ */
+ @Nullable DAGScheduler getDAGScheduler();
+
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
index 3055cd3..2fa735e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/DAGScheduler.java
@@ -89,4 +89,26 @@
public abstract void scheduleTaskEx(DAGEventSchedulerUpdate event);
public abstract void taskCompletedEx(DAGEventSchedulerUpdate event);
+
+ /**
+ * Get the low limit priority for a particular vertex.
+ * @param vertex to get the priority of
+ * @return the priority
+ */
+ public int getPriorityLowLimit(final DAG dag, final Vertex vertex) {
+ final int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
+ return ((vertexDistanceFromRoot + 1) * dag.getTotalVertices() * 3)
+ + (vertex.getVertexId().getId() * 3);
+ }
+
+ /**
+ * Get the low hight priority for a particular vertex. Default
+ * to the low limit priority minus two.
+ * @param vertex to get the priority of
+ * @return the priority
+ */
+ public int getPriorityHighLimit(final DAG dag, final Vertex vertex) {
+ return getPriorityLowLimit(dag, vertex) - 2;
+ }
+
}
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index bd5e0ff..6dcc7f0 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -42,6 +42,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.Utils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.LimitExceededException;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
@@ -1620,6 +1621,9 @@
}
}
+ // This is going to override the previously generated file
+ // which didn't have the priorities
+ Utils.generateDAGVizFile(this, jobPlan, dagScheduler);
return DAGState.INITED;
}
@@ -2382,6 +2386,11 @@
}
}
+ @Override
+ public DAGScheduler getDAGScheduler() {
+ return dagScheduler;
+ }
+
// output of either vertex or vertex group
public static class OutputKey {
String outputName;
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
index 3a16f46..2383db8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrder.java
@@ -46,11 +46,10 @@
public void scheduleTaskEx(DAGEventSchedulerUpdate event) {
TaskAttempt attempt = event.getAttempt();
Vertex vertex = dag.getVertex(attempt.getVertexID());
- int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
// natural priority. Handles failures and retries.
- int priorityLowLimit = ((vertexDistanceFromRoot + 1) * dag.getTotalVertices() * 3) + (vertex.getVertexId().getId() * 3);
- int priorityHighLimit = priorityLowLimit - 2;
+ int priorityLowLimit = getPriorityLowLimit(dag, vertex);
+ int priorityHighLimit = getPriorityHighLimit(dag, vertex);
if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling " + attempt.getID() + " between priorityLow: " + priorityLowLimit
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
index 34cc92f..c51783b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGSchedulerNaturalOrderControlled.java
@@ -76,11 +76,10 @@
public void scheduleTaskEx(DAGEventSchedulerUpdate event) {
TaskAttempt attempt = event.getAttempt();
Vertex vertex = dag.getVertex(attempt.getVertexID());
- int vertexDistanceFromRoot = vertex.getDistanceFromRoot();
// natural priority. Handles failures and retries.
- int priorityLowLimit = ((vertexDistanceFromRoot + 1) * dag.getTotalVertices() * 3) + (vertex.getVertexId().getId() * 3);
- int priorityHighLimit = priorityLowLimit - 2;
+ int priorityLowLimit = getPriorityLowLimit(dag, vertex);
+ int priorityHighLimit = getPriorityHighLimit(dag, vertex);
TaskAttemptEventSchedule attemptEvent = new TaskAttemptEventSchedule(
attempt.getID(), priorityLowLimit, priorityHighLimit);
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
index f38f689..07c361a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGScheduler.java
@@ -58,14 +58,19 @@
TaskAttempt mockAttempt = mock(TaskAttempt.class);
when(mockDag.getVertex((TezVertexID) any())).thenReturn(mockVertex);
when(mockDag.getTotalVertices()).thenReturn(4);
- when(mockVertex.getDistanceFromRoot()).thenReturn(0).thenReturn(1)
- .thenReturn(2);
+ when(mockVertex.getDistanceFromRoot())
+ .thenReturn(0).thenReturn(0)
+ .thenReturn(1).thenReturn(1)
+ .thenReturn(2).thenReturn(2);
TezVertexID vId0 = TezVertexID.fromString("vertex_1436907267600_195589_1_00");
TezVertexID vId1 = TezVertexID.fromString("vertex_1436907267600_195589_1_01");
TezVertexID vId2 = TezVertexID.fromString("vertex_1436907267600_195589_1_02");
TezVertexID vId3 = TezVertexID.fromString("vertex_1436907267600_195589_1_03");
- when(mockVertex.getVertexId()).thenReturn(vId0).thenReturn(vId1)
- .thenReturn(vId2).thenReturn(vId3);
+ when(mockVertex.getVertexId())
+ .thenReturn(vId0).thenReturn(vId0)
+ .thenReturn(vId1).thenReturn(vId1)
+ .thenReturn(vId2).thenReturn(vId2)
+ .thenReturn(vId3).thenReturn(vId3);
DAGEventSchedulerUpdate event = new DAGEventSchedulerUpdate(
DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt);