TEZ-3511. Allow user to create named edge
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index c136811..9151d48 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -273,6 +273,13 @@
 
   /**
    * Add an {@link Edge} connecting vertices in the DAG
+   *
+   * All edges within a DAG must be either named (created via
+   * {@link org.apache.tez.dag.api.Edge#create(Vertex, Vertex, EdgeProperty, String)}) or unnamed
+   * (created via {@link org.apache.tez.dag.api.Edge#create(Vertex, Vertex, EdgeProperty)}).
+   * If edges are named, all inbound edges to a vertex should have unique names. Likewise for outbound edges.
+   * A vertex can have an inbound edge that uses the same name as that used by an outbound edge.
+   *
    * @param edge The edge to be added
    * @return {@link DAG}
    */
@@ -571,8 +578,6 @@
     // check for valid vertices, duplicate vertex names,
     // and prepare for cycle detection
     Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
-    Map<Vertex, Set<String>> inboundVertexMap = new HashMap<Vertex, Set<String>>();
-    Map<Vertex, Set<String>> outboundVertexMap = new HashMap<Vertex, Set<String>>();
     for (Vertex v : vertices.values()) {
       if (vertexMap.containsKey(v.getName())) {
         throw new IllegalStateException("DAG contains multiple vertices"
@@ -581,33 +586,45 @@
       vertexMap.put(v.getName(), new AnnotatedVertex(v));
     }
 
-    Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
+    // named edge cannot be mixed with unnamed edge or group edge
+    Edge namedEdge = null, unnamedEdge = null;
+    for (Edge e : edges) {
+      if (e.getName() == null) {
+        unnamedEdge = e;
+      } else {
+        namedEdge = e;
+      }
+
+      if (namedEdge != null && !groupInputEdges.isEmpty()) {
+        throw new IllegalStateException("DAG shouldn't contains both named edge " + namedEdge
+          + " and group edge " + groupInputEdges.iterator().next());
+      }
+      if (namedEdge != null && unnamedEdge != null) {
+        throw new IllegalStateException("DAG shouldn't contains both named edge " + namedEdge
+          + " and unnamed edge " + unnamedEdge);
+      }
+    }
+
+    Map<Vertex, List<Edge>> inEdgeMap = new HashMap<>();
+    Map<Vertex, List<Edge>> outEdgeMap = new HashMap<>();
     for (Edge e : edges) {
       // Construct structure for cycle detection
       Vertex inputVertex = e.getInputVertex();
       Vertex outputVertex = e.getOutputVertex();      
-      List<Edge> edgeList = edgeMap.get(inputVertex);
-      if (edgeList == null) {
-        edgeList = new ArrayList<Edge>();
-        edgeMap.put(inputVertex, edgeList);
+
+      List<Edge> outEdgeList = outEdgeMap.get(inputVertex);
+      if (outEdgeList == null) {
+        outEdgeList = new ArrayList<Edge>();
+        outEdgeMap.put(inputVertex, outEdgeList);
       }
-      edgeList.add(e);
-      
-      // Construct map for Input name verification
-      Set<String> inboundSet = inboundVertexMap.get(outputVertex);
-      if (inboundSet == null) {
-        inboundSet = new HashSet<String>();
-        inboundVertexMap.put(outputVertex, inboundSet);
+      outEdgeList.add(e);
+
+      List<Edge> inEdgeList = inEdgeMap.get(outputVertex);
+      if (inEdgeList == null) {
+        inEdgeList = new ArrayList<Edge>();
+        inEdgeMap.put(outputVertex, inEdgeList);
       }
-      inboundSet.add(inputVertex.getName());
-      
-      // Construct map for Output name verification
-      Set<String> outboundSet = outboundVertexMap.get(inputVertex);
-      if (outboundSet == null) {
-        outboundSet = new HashSet<String>();
-        outboundVertexMap.put(inputVertex, outboundSet);
-      }
-      outboundSet.add(outputVertex.getName());
+      inEdgeList.add(e);
     }
 
     // check input and output names don't collide with vertex names
@@ -633,29 +650,54 @@
     }
 
     // Check for valid InputNames
-    for (Entry<Vertex, Set<String>> entry : inboundVertexMap.entrySet()) {
+    for (Entry<Vertex, List<Edge>> entry : inEdgeMap.entrySet()) {
       Vertex vertex = entry.getKey();
+      Set<String> inputs = new HashSet<>();
+
+      for (Edge edge : entry.getValue()) {
+        String name = edge.getName();
+        if (name == null) {
+          name = edge.getInputVertex().getName();
+        }
+        if (inputs.contains(name)) {
+          throw new IllegalStateException("Vertex: " + vertex.getName() + " contains multiple " +
+            "incoming edges with name " + name);
+        }
+        inputs.add(name);
+      }
       for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> 
            input : vertex.getInputs()) {
-        if (entry.getValue().contains(input.getName())) {
+        if (inputs.contains(input.getName())) {
           throw new IllegalStateException("Vertex: "
               + vertex.getName()
-              + " contains an incoming vertex and Input with the same name: "
-              + input.getName());
+              + " contains an incoming " + (namedEdge != null ? "edge" : "vertex")
+              + " and Input with the same name: " + input.getName());
         }
       }
     }
 
-    // Check for valid OutputNames
-    for (Entry<Vertex, Set<String>> entry : outboundVertexMap.entrySet()) {
+    for (Entry<Vertex, List<Edge>> entry : outEdgeMap.entrySet()) {
       Vertex vertex = entry.getKey();
-      for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor> 
-            output : vertex.getOutputs()) {
-        if (entry.getValue().contains(output.getName())) {
+      Set<String> outputs = new HashSet<>();
+
+      for (Edge edge : entry.getValue()) {
+        String name = edge.getName();
+        if (name == null) {
+          name = edge.getOutputVertex().getName();
+        }
+        if (outputs.contains(name)) {
+          throw new IllegalStateException("Vertex: " + vertex.getName() + " contains multiple " +
+            "outgoing edges with name " + name);
+        }
+        outputs.add(name);
+      }
+      for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>
+        output : vertex.getOutputs()) {
+        if (outputs.contains(output.getName())) {
           throw new IllegalStateException("Vertex: "
-              + vertex.getName()
-              + " contains an outgoing vertex and Output with the same name: "
-              + output.getName());
+            + vertex.getName()
+            + " contains an outgoing " + (namedEdge != null ? "edge" : "vertex")
+            + " and Output with the same name: " + output.getName());
         }
       }
     }
@@ -666,7 +708,7 @@
     // When additional inputs are supported, this can be chceked easily (and early)
     // within the addInput / addOutput call itself.
 
-    Deque<String> topologicalVertexStack = detectCycles(edgeMap, vertexMap);
+    Deque<String> topologicalVertexStack = detectCycles(outEdgeMap, vertexMap);
 
     checkAndInferOneToOneParallelism();
 
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
index db509f7..794d88d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
@@ -24,7 +24,7 @@
  * Edge defines the connection between a producer and consumer vertex in the DAG.
  * @link {@link EdgeProperty} defines the relationship between them. The producer
  * vertex provides input to the edge and the consumer vertex reads output from the 
- * edge.
+ * edge. Edge could be either named or not.
  * 
  */
 @Public
@@ -33,13 +33,16 @@
   private final Vertex inputVertex;
   private final Vertex outputVertex;
   private final EdgeProperty edgeProperty;
+  private final String name;
 
   private Edge(Vertex inputVertex,
                Vertex outputVertex,
-               EdgeProperty edgeProperty) {
+               EdgeProperty edgeProperty,
+               String name) {
     this.inputVertex = inputVertex;
     this.outputVertex = outputVertex;
     this.edgeProperty = edgeProperty;
+    this.name = name;
   }
 
 
@@ -57,7 +60,25 @@
   public static Edge create(Vertex inputVertex,
                             Vertex outputVertex,
                             EdgeProperty edgeProperty) {
-    return new Edge(inputVertex, outputVertex, edgeProperty);
+    return new Edge(inputVertex, outputVertex, edgeProperty, null);
+  }
+
+  /**
+   * Creates an edge with specified name between the specified vertices.
+   *
+   * InputVertex(EdgeInput) ----- Edge ----- OutputVertex(EdgeOutput)]
+   *
+   * @param inputVertex the vertex which generates data to the edge.
+   * @param outputVertex the vertex which consumes data from the edge
+   * @param edgeProperty {@link org.apache.tez.dag.api.EdgeProperty} associated with this edge
+   * @param name name of edge
+   * @return the {@link org.apache.tez.dag.api.Edge}
+   */
+  public static Edge create(Vertex inputVertex,
+                            Vertex outputVertex,
+                            EdgeProperty edgeProperty,
+                            String name) {
+    return new Edge(inputVertex, outputVertex, edgeProperty, name);
   }
 
   /**
@@ -83,6 +104,14 @@
   public EdgeProperty getEdgeProperty() {
     return edgeProperty;
   }
+
+  /**
+   * The name of this edge (or null if edge has no name)
+   * @return edge name or null
+   */
+  public String getName() {
+    return name;
+  }
   
   /*
    * Used to identify the edge in the configuration
@@ -95,39 +124,29 @@
  
   @Override
   public String toString() {
-    return inputVertex + " -> " + outputVertex + " (" + edgeProperty + ")";
+    return "{" + name + " : " + inputVertex + " -> " + outputVertex + " " + edgeProperty + "}";
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) return true;
+    if (other == null || getClass() != other.getClass()) return false;
+
+    Edge edge = (Edge) other;
+
+    if (inputVertex != null ? !inputVertex.equals(edge.inputVertex) : edge.inputVertex != null)
+      return false;
+    if (outputVertex != null ? !outputVertex.equals(edge.outputVertex) : edge.outputVertex != null)
+      return false;
+    return name != null ? name.equals(edge.name) : edge.name == null;
+
   }
 
   @Override
   public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result
-        + ((inputVertex == null) ? 0 : inputVertex.hashCode());
-    result = prime * result
-        + ((outputVertex == null) ? 0 : outputVertex.hashCode());
+    int result = inputVertex != null ? inputVertex.hashCode() : 0;
+    result = 31 * result + (outputVertex != null ? outputVertex.hashCode() : 0);
+    result = 31 * result + (name != null ? name.hashCode() : 0);
     return result;
   }
-
-  @Override
-  public boolean equals(Object obj) {
-    if (this == obj)
-      return true;
-    if (obj == null)
-      return false;
-    if (getClass() != obj.getClass())
-      return false;
-    Edge other = (Edge) obj;
-    if (inputVertex == null) {
-      if (other.inputVertex != null)
-        return false;
-    } else if (!inputVertex.equals(other.inputVertex))
-      return false;
-    if (outputVertex == null) {
-      if (other.outputVertex != null)
-        return false;
-    } else if (!outputVertex.equals(other.outputVertex))
-      return false;
-    return true;
-  }
 }
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
index 07fb2c1..3723433 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
@@ -243,10 +243,10 @@
   
   @Override
   public String toString() {
-    return "{ " + dataMovementType + " : " + inputDescriptor.getClassName()
+    return "(" + dataMovementType + " : " + inputDescriptor.getClassName()
         + " >> " + dataSourceType + " >> " + outputDescriptor.getClassName()
         + " >> " + (edgeManagerDescriptor == null ? "NullEdgeManager" : edgeManagerDescriptor.getClassName())
-        + " }";
+        + ")";
   }
   
 }
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index 05c4e30..691b4c1 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -88,6 +88,32 @@
   }
 
   @Test(timeout = 5000)
+  public void testAddDuplicatedNamedEdge() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+
+    Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "Edge1");
+    Edge edge2 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "Edge1");
+
+    DAG dag = DAG.create("testDAG").addVertex(v1).addVertex(v2).addEdge(edge1);
+
+    try {
+      dag.addEdge(edge2);
+      Assert.fail("should fail it due to duplicate named edges");
+    } catch (Exception e) {
+      Assert.assertTrue(e.getMessage().contains(edge1 + " already defined"));
+    }
+  }
+
+  @Test(timeout = 5000)
   public void testDuplicatedVertexGroup() {
     Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
         dummyTaskCount, dummyTaskResource);
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 5706542..720820d 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -1191,4 +1191,253 @@
       Assert.assertTrue(e.getMessage().contains("There is conflicting local resource"));
     }
   }
+
+  @Test(timeout = 5000)
+  public void testNamedEdge() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+
+
+    Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "Edge1");
+
+    DAG dag = DAG.create("testDAG").addVertex(v1).addVertex(v2).addEdge(edge1);
+    dag.verify();
+
+    Edge edge2 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "Edge2");
+
+    dag.addEdge(edge2).verify();
+  }
+
+  @Test(timeout = 5000)
+  public void testNamedEdgeMixedWithUnnamedEdge() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+
+
+    Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "Edge1");
+    Edge edge2 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")));
+
+    DAG dag = DAG.create("testDAG").addVertex(v1).addVertex(v2).addEdge(edge1).addEdge(edge2);
+
+    try {
+      dag.verify();
+      Assert.fail("should fail it because DAG has both named and unnamed edge");
+    } catch (Exception e) {
+      Assert.assertTrue(
+        e.getMessage().contains(
+          "DAG shouldn't contains both named edge " + edge1 + " and unnamed edge " + edge2));
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testNamedEdgeWithGroupEdge() {
+    Vertex v1 = Vertex.create("v1",
+      ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2",
+      ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v3 = Vertex.create("v3",
+      ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+
+    DAG dag = DAG.create("testDag");
+    dag.addVertex(v1);
+    dag.addVertex(v2);
+    dag.addVertex(v3);
+    String groupName1 = "uv12";
+    VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
+
+    GroupInputEdge e1 = GroupInputEdge.create(uv12, v3,
+      EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+        DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+        OutputDescriptor.create("dummy output class"),
+        InputDescriptor.create("dummy input class")),
+      InputDescriptor.create("dummy input class"));
+
+    Edge e2 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.CONCURRENT, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "e2");
+
+    dag.addEdge(e1).addEdge(e2);
+    try {
+      dag.verify();
+      Assert.fail("should fail it because DAG has both named edge and group edge");
+    } catch (Exception e) {
+      Assert.assertTrue(
+        e.getMessage().contains("DAG shouldn't contains both named edge " + e2 + " and group edge "
+          + e1));
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testInNamedEdgeCollide() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+
+    Edge edge1 = Edge.create(v1, v3, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "Edge1");
+    Edge edge2 = Edge.create(v2, v3, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "Edge1");
+
+    DAG dag =
+      DAG.create("testDAG").addVertex(v1).addVertex(v2).addVertex(v3).addEdge(edge1).addEdge(edge2);
+
+    try {
+      dag.verify();
+      Assert.fail("should fail it because v3 gets multiple incoming edges with same name");
+    } catch (Exception e) {
+      Assert.assertTrue(
+        e.getMessage().contains("v3 contains multiple incoming edges with name Edge1"));
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testOutNamedEdgeCollide() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+
+    Edge edge1 = Edge.create(v1, v3, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "Edge1");
+    Edge edge2 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "Edge1");
+
+    DAG dag =
+      DAG.create("testDAG").addVertex(v1).addVertex(v2).addVertex(v3).addEdge(edge1).addEdge(edge2);
+
+    try {
+      dag.verify();
+      Assert.fail("should fail it because v3 gets multiple outgoing edges with same name");
+    } catch (Exception e) {
+      Assert.assertTrue(
+        e.getMessage().contains("v1 contains multiple outgoing edges with name Edge1"));
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testInEdgeOutEdgeWithSameName() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+
+    Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "Edge1");
+    Edge edge2 = Edge.create(v2, v3, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "Edge1");
+
+    DAG.create("testDAG")
+      .addVertex(v1).addVertex(v2).addVertex(v3)
+      .addEdge(edge1).addEdge(edge2)
+      .verify();
+  }
+
+  @Test(timeout = 5000)
+  public void testNamedEdgeCollideWithRootInput() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    v2.addDataSource("input",
+      DataSourceDescriptor.create(InputDescriptor.create("input"), null, null));
+
+    Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "input");
+
+    DAG dag = DAG.create("testDag").addVertex(v1).addVertex(v2).addEdge(edge1);
+
+    try {
+      dag.verify();
+      Assert.fail("should fail it because v2 get incoming edge and input with same name");
+    } catch (Exception e) {
+      Assert.assertTrue(
+        e.getMessage().contains("v2 contains an incoming edge and Input with the same name: input"));
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testNamedEdgeCollideWithLeafOutput() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    v1.addDataSink("output",
+      DataSinkDescriptor.create(OutputDescriptor.create("output"), null, null));
+
+    Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "output");
+
+    DAG dag = DAG.create("testDag").addVertex(v1).addVertex(v2).addEdge(edge1);
+
+    try {
+      dag.verify();
+      Assert.fail("should fail it because v2 get outgoing edge and output with same name");
+    } catch (Exception e) {
+      Assert.assertTrue(e.getMessage().contains(
+        "v1 contains an outgoing edge and Output with the same name: output"));
+    }
+  }
+
+  @Test(timeout = 5000)
+  public void testNamedEdgeUsingVertexName() {
+    Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+    Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+      dummyTaskCount, dummyTaskResource);
+
+    Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "v1");
+    Edge edge2 = Edge.create(v1, v2, EdgeProperty.create(
+      DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+      SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+      InputDescriptor.create("input")), "v2");
+
+    DAG dag = DAG.create("testDag").addVertex(v1).addVertex(v2).addEdge(edge1).addEdge(edge2);
+    dag.verify();
+  }
 }
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestEdge.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestEdge.java
new file mode 100644
index 0000000..61f4fbc
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestEdge.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestEdge {
+  Vertex v1, v2;
+  EdgeProperty edgeProperty;
+  Set<Edge> set;
+
+  @Before
+  public void setup() {
+    v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"));
+    v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"));
+    edgeProperty = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER,
+      EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.CONCURRENT,
+      OutputDescriptor.create("output"), InputDescriptor.create("input"));
+    set = new HashSet<>();
+  }
+
+  @Test(timeout = 5000)
+  public void testHashAndEqualsUnnamed() {
+    // edges without name but everything else same are equal and have same hash
+    Edge e1 = Edge.create(v1, v2, edgeProperty);
+    Edge e2 = Edge.create(v1, v2, edgeProperty);
+    assertEquals(e1, e2);
+    set.add(e1);
+    assertTrue(set.contains(e2));
+  }
+
+  @Test(timeout = 5000)
+  public void testHashAndEqualsNamed() {
+    // edges with everything same including name are equal and have same hash
+    Edge e1 = Edge.create(v1, v2, edgeProperty, "e1");
+    Edge e2 = Edge.create(v1, v2, edgeProperty, "e1");
+    assertEquals(e1, e2);
+    set.add(e1);
+    assertTrue(set.contains(e2));
+  }
+
+  @Test(timeout = 5000)
+  public void testHashAndEqualsDifferentName() {
+    // edges with different name but everything else same are not equal and have different hash
+    Edge e1 = Edge.create(v1, v2, edgeProperty, "e1");
+    Edge e2 = Edge.create(v1, v2, edgeProperty, "e2");
+    assertNotEquals(e1, e2);
+    set.add(e1);
+    assertFalse(set.contains(e2));
+  }
+}