TEZ-3511. Allow user to create named edge
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index c136811..9151d48 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -273,6 +273,13 @@
/**
* Add an {@link Edge} connecting vertices in the DAG
+ *
+ * All edges within a DAG must be either named (created via
+ * {@link org.apache.tez.dag.api.Edge#create(Vertex, Vertex, EdgeProperty, String)}) or unnamed
+ * (created via {@link org.apache.tez.dag.api.Edge#create(Vertex, Vertex, EdgeProperty)}).
+ * If edges are named, all inbound edges to a vertex should have unique names. Likewise for outbound edges.
+ * A vertex can have an inbound edge that uses the same name as that used by an outbound edge.
+ *
* @param edge The edge to be added
* @return {@link DAG}
*/
@@ -571,8 +578,6 @@
// check for valid vertices, duplicate vertex names,
// and prepare for cycle detection
Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
- Map<Vertex, Set<String>> inboundVertexMap = new HashMap<Vertex, Set<String>>();
- Map<Vertex, Set<String>> outboundVertexMap = new HashMap<Vertex, Set<String>>();
for (Vertex v : vertices.values()) {
if (vertexMap.containsKey(v.getName())) {
throw new IllegalStateException("DAG contains multiple vertices"
@@ -581,33 +586,45 @@
vertexMap.put(v.getName(), new AnnotatedVertex(v));
}
- Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
+ // named edge cannot be mixed with unnamed edge or group edge
+ Edge namedEdge = null, unnamedEdge = null;
+ for (Edge e : edges) {
+ if (e.getName() == null) {
+ unnamedEdge = e;
+ } else {
+ namedEdge = e;
+ }
+
+ if (namedEdge != null && !groupInputEdges.isEmpty()) {
+ throw new IllegalStateException("DAG shouldn't contains both named edge " + namedEdge
+ + " and group edge " + groupInputEdges.iterator().next());
+ }
+ if (namedEdge != null && unnamedEdge != null) {
+ throw new IllegalStateException("DAG shouldn't contains both named edge " + namedEdge
+ + " and unnamed edge " + unnamedEdge);
+ }
+ }
+
+ Map<Vertex, List<Edge>> inEdgeMap = new HashMap<>();
+ Map<Vertex, List<Edge>> outEdgeMap = new HashMap<>();
for (Edge e : edges) {
// Construct structure for cycle detection
Vertex inputVertex = e.getInputVertex();
Vertex outputVertex = e.getOutputVertex();
- List<Edge> edgeList = edgeMap.get(inputVertex);
- if (edgeList == null) {
- edgeList = new ArrayList<Edge>();
- edgeMap.put(inputVertex, edgeList);
+
+ List<Edge> outEdgeList = outEdgeMap.get(inputVertex);
+ if (outEdgeList == null) {
+ outEdgeList = new ArrayList<Edge>();
+ outEdgeMap.put(inputVertex, outEdgeList);
}
- edgeList.add(e);
-
- // Construct map for Input name verification
- Set<String> inboundSet = inboundVertexMap.get(outputVertex);
- if (inboundSet == null) {
- inboundSet = new HashSet<String>();
- inboundVertexMap.put(outputVertex, inboundSet);
+ outEdgeList.add(e);
+
+ List<Edge> inEdgeList = inEdgeMap.get(outputVertex);
+ if (inEdgeList == null) {
+ inEdgeList = new ArrayList<Edge>();
+ inEdgeMap.put(outputVertex, inEdgeList);
}
- inboundSet.add(inputVertex.getName());
-
- // Construct map for Output name verification
- Set<String> outboundSet = outboundVertexMap.get(inputVertex);
- if (outboundSet == null) {
- outboundSet = new HashSet<String>();
- outboundVertexMap.put(inputVertex, outboundSet);
- }
- outboundSet.add(outputVertex.getName());
+ inEdgeList.add(e);
}
// check input and output names don't collide with vertex names
@@ -633,29 +650,54 @@
}
// Check for valid InputNames
- for (Entry<Vertex, Set<String>> entry : inboundVertexMap.entrySet()) {
+ for (Entry<Vertex, List<Edge>> entry : inEdgeMap.entrySet()) {
Vertex vertex = entry.getKey();
+ Set<String> inputs = new HashSet<>();
+
+ for (Edge edge : entry.getValue()) {
+ String name = edge.getName();
+ if (name == null) {
+ name = edge.getInputVertex().getName();
+ }
+ if (inputs.contains(name)) {
+ throw new IllegalStateException("Vertex: " + vertex.getName() + " contains multiple " +
+ "incoming edges with name " + name);
+ }
+ inputs.add(name);
+ }
for (RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>
input : vertex.getInputs()) {
- if (entry.getValue().contains(input.getName())) {
+ if (inputs.contains(input.getName())) {
throw new IllegalStateException("Vertex: "
+ vertex.getName()
- + " contains an incoming vertex and Input with the same name: "
- + input.getName());
+ + " contains an incoming " + (namedEdge != null ? "edge" : "vertex")
+ + " and Input with the same name: " + input.getName());
}
}
}
- // Check for valid OutputNames
- for (Entry<Vertex, Set<String>> entry : outboundVertexMap.entrySet()) {
+ for (Entry<Vertex, List<Edge>> entry : outEdgeMap.entrySet()) {
Vertex vertex = entry.getKey();
- for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>
- output : vertex.getOutputs()) {
- if (entry.getValue().contains(output.getName())) {
+ Set<String> outputs = new HashSet<>();
+
+ for (Edge edge : entry.getValue()) {
+ String name = edge.getName();
+ if (name == null) {
+ name = edge.getOutputVertex().getName();
+ }
+ if (outputs.contains(name)) {
+ throw new IllegalStateException("Vertex: " + vertex.getName() + " contains multiple " +
+ "outgoing edges with name " + name);
+ }
+ outputs.add(name);
+ }
+ for (RootInputLeafOutput<OutputDescriptor, OutputCommitterDescriptor>
+ output : vertex.getOutputs()) {
+ if (outputs.contains(output.getName())) {
throw new IllegalStateException("Vertex: "
- + vertex.getName()
- + " contains an outgoing vertex and Output with the same name: "
- + output.getName());
+ + vertex.getName()
+ + " contains an outgoing " + (namedEdge != null ? "edge" : "vertex")
+ + " and Output with the same name: " + output.getName());
}
}
}
@@ -666,7 +708,7 @@
// When additional inputs are supported, this can be chceked easily (and early)
// within the addInput / addOutput call itself.
- Deque<String> topologicalVertexStack = detectCycles(edgeMap, vertexMap);
+ Deque<String> topologicalVertexStack = detectCycles(outEdgeMap, vertexMap);
checkAndInferOneToOneParallelism();
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
index db509f7..794d88d 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/Edge.java
@@ -24,7 +24,7 @@
* Edge defines the connection between a producer and consumer vertex in the DAG.
* @link {@link EdgeProperty} defines the relationship between them. The producer
* vertex provides input to the edge and the consumer vertex reads output from the
- * edge.
+ * edge. Edge could be either named or not.
*
*/
@Public
@@ -33,13 +33,16 @@
private final Vertex inputVertex;
private final Vertex outputVertex;
private final EdgeProperty edgeProperty;
+ private final String name;
private Edge(Vertex inputVertex,
Vertex outputVertex,
- EdgeProperty edgeProperty) {
+ EdgeProperty edgeProperty,
+ String name) {
this.inputVertex = inputVertex;
this.outputVertex = outputVertex;
this.edgeProperty = edgeProperty;
+ this.name = name;
}
@@ -57,7 +60,25 @@
public static Edge create(Vertex inputVertex,
Vertex outputVertex,
EdgeProperty edgeProperty) {
- return new Edge(inputVertex, outputVertex, edgeProperty);
+ return new Edge(inputVertex, outputVertex, edgeProperty, null);
+ }
+
+ /**
+ * Creates an edge with specified name between the specified vertices.
+ *
+ * InputVertex(EdgeInput) ----- Edge ----- OutputVertex(EdgeOutput)]
+ *
+ * @param inputVertex the vertex which generates data to the edge.
+ * @param outputVertex the vertex which consumes data from the edge
+ * @param edgeProperty {@link org.apache.tez.dag.api.EdgeProperty} associated with this edge
+ * @param name name of edge
+ * @return the {@link org.apache.tez.dag.api.Edge}
+ */
+ public static Edge create(Vertex inputVertex,
+ Vertex outputVertex,
+ EdgeProperty edgeProperty,
+ String name) {
+ return new Edge(inputVertex, outputVertex, edgeProperty, name);
}
/**
@@ -83,6 +104,14 @@
public EdgeProperty getEdgeProperty() {
return edgeProperty;
}
+
+ /**
+ * The name of this edge (or null if edge has no name)
+ * @return edge name or null
+ */
+ public String getName() {
+ return name;
+ }
/*
* Used to identify the edge in the configuration
@@ -95,39 +124,29 @@
@Override
public String toString() {
- return inputVertex + " -> " + outputVertex + " (" + edgeProperty + ")";
+ return "{" + name + " : " + inputVertex + " -> " + outputVertex + " " + edgeProperty + "}";
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) return true;
+ if (other == null || getClass() != other.getClass()) return false;
+
+ Edge edge = (Edge) other;
+
+ if (inputVertex != null ? !inputVertex.equals(edge.inputVertex) : edge.inputVertex != null)
+ return false;
+ if (outputVertex != null ? !outputVertex.equals(edge.outputVertex) : edge.outputVertex != null)
+ return false;
+ return name != null ? name.equals(edge.name) : edge.name == null;
+
}
@Override
public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result
- + ((inputVertex == null) ? 0 : inputVertex.hashCode());
- result = prime * result
- + ((outputVertex == null) ? 0 : outputVertex.hashCode());
+ int result = inputVertex != null ? inputVertex.hashCode() : 0;
+ result = 31 * result + (outputVertex != null ? outputVertex.hashCode() : 0);
+ result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- Edge other = (Edge) obj;
- if (inputVertex == null) {
- if (other.inputVertex != null)
- return false;
- } else if (!inputVertex.equals(other.inputVertex))
- return false;
- if (outputVertex == null) {
- if (other.outputVertex != null)
- return false;
- } else if (!outputVertex.equals(other.outputVertex))
- return false;
- return true;
- }
}
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
index 07fb2c1..3723433 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/EdgeProperty.java
@@ -243,10 +243,10 @@
@Override
public String toString() {
- return "{ " + dataMovementType + " : " + inputDescriptor.getClassName()
+ return "(" + dataMovementType + " : " + inputDescriptor.getClassName()
+ " >> " + dataSourceType + " >> " + outputDescriptor.getClassName()
+ " >> " + (edgeManagerDescriptor == null ? "NullEdgeManager" : edgeManagerDescriptor.getClassName())
- + " }";
+ + ")";
}
}
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
index 05c4e30..691b4c1 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAG.java
@@ -88,6 +88,32 @@
}
@Test(timeout = 5000)
+ public void testAddDuplicatedNamedEdge() {
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+
+ Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "Edge1");
+ Edge edge2 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "Edge1");
+
+ DAG dag = DAG.create("testDAG").addVertex(v1).addVertex(v2).addEdge(edge1);
+
+ try {
+ dag.addEdge(edge2);
+ Assert.fail("should fail it due to duplicate named edges");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains(edge1 + " already defined"));
+ }
+ }
+
+ @Test(timeout = 5000)
public void testDuplicatedVertexGroup() {
Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
dummyTaskCount, dummyTaskResource);
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
index 5706542..720820d 100644
--- a/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestDAGVerify.java
@@ -1191,4 +1191,253 @@
Assert.assertTrue(e.getMessage().contains("There is conflicting local resource"));
}
}
+
+ @Test(timeout = 5000)
+ public void testNamedEdge() {
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+
+
+ Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "Edge1");
+
+ DAG dag = DAG.create("testDAG").addVertex(v1).addVertex(v2).addEdge(edge1);
+ dag.verify();
+
+ Edge edge2 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "Edge2");
+
+ dag.addEdge(edge2).verify();
+ }
+
+ @Test(timeout = 5000)
+ public void testNamedEdgeMixedWithUnnamedEdge() {
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+
+
+ Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "Edge1");
+ Edge edge2 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")));
+
+ DAG dag = DAG.create("testDAG").addVertex(v1).addVertex(v2).addEdge(edge1).addEdge(edge2);
+
+ try {
+ dag.verify();
+ Assert.fail("should fail it because DAG has both named and unnamed edge");
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage().contains(
+ "DAG shouldn't contains both named edge " + edge1 + " and unnamed edge " + edge2));
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testNamedEdgeWithGroupEdge() {
+ Vertex v1 = Vertex.create("v1",
+ ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = Vertex.create("v2",
+ ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v3 = Vertex.create("v3",
+ ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+
+ DAG dag = DAG.create("testDag");
+ dag.addVertex(v1);
+ dag.addVertex(v2);
+ dag.addVertex(v3);
+ String groupName1 = "uv12";
+ VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
+
+ GroupInputEdge e1 = GroupInputEdge.create(uv12, v3,
+ EdgeProperty.create(DataMovementType.SCATTER_GATHER,
+ DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
+ OutputDescriptor.create("dummy output class"),
+ InputDescriptor.create("dummy input class")),
+ InputDescriptor.create("dummy input class"));
+
+ Edge e2 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.CONCURRENT, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "e2");
+
+ dag.addEdge(e1).addEdge(e2);
+ try {
+ dag.verify();
+ Assert.fail("should fail it because DAG has both named edge and group edge");
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage().contains("DAG shouldn't contains both named edge " + e2 + " and group edge "
+ + e1));
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testInNamedEdgeCollide() {
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+
+ Edge edge1 = Edge.create(v1, v3, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "Edge1");
+ Edge edge2 = Edge.create(v2, v3, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "Edge1");
+
+ DAG dag =
+ DAG.create("testDAG").addVertex(v1).addVertex(v2).addVertex(v3).addEdge(edge1).addEdge(edge2);
+
+ try {
+ dag.verify();
+ Assert.fail("should fail it because v3 gets multiple incoming edges with same name");
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage().contains("v3 contains multiple incoming edges with name Edge1"));
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testOutNamedEdgeCollide() {
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+
+ Edge edge1 = Edge.create(v1, v3, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "Edge1");
+ Edge edge2 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "Edge1");
+
+ DAG dag =
+ DAG.create("testDAG").addVertex(v1).addVertex(v2).addVertex(v3).addEdge(edge1).addEdge(edge2);
+
+ try {
+ dag.verify();
+ Assert.fail("should fail it because v3 gets multiple outgoing edges with same name");
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage().contains("v1 contains multiple outgoing edges with name Edge1"));
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testInEdgeOutEdgeWithSameName() {
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+
+ Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "Edge1");
+ Edge edge2 = Edge.create(v2, v3, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "Edge1");
+
+ DAG.create("testDAG")
+ .addVertex(v1).addVertex(v2).addVertex(v3)
+ .addEdge(edge1).addEdge(edge2)
+ .verify();
+ }
+
+ @Test(timeout = 5000)
+ public void testNamedEdgeCollideWithRootInput() {
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ v2.addDataSource("input",
+ DataSourceDescriptor.create(InputDescriptor.create("input"), null, null));
+
+ Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "input");
+
+ DAG dag = DAG.create("testDag").addVertex(v1).addVertex(v2).addEdge(edge1);
+
+ try {
+ dag.verify();
+ Assert.fail("should fail it because v2 get incoming edge and input with same name");
+ } catch (Exception e) {
+ Assert.assertTrue(
+ e.getMessage().contains("v2 contains an incoming edge and Input with the same name: input"));
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testNamedEdgeCollideWithLeafOutput() {
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ v1.addDataSink("output",
+ DataSinkDescriptor.create(OutputDescriptor.create("output"), null, null));
+
+ Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "output");
+
+ DAG dag = DAG.create("testDag").addVertex(v1).addVertex(v2).addEdge(edge1);
+
+ try {
+ dag.verify();
+ Assert.fail("should fail it because v2 get outgoing edge and output with same name");
+ } catch (Exception e) {
+ Assert.assertTrue(e.getMessage().contains(
+ "v1 contains an outgoing edge and Output with the same name: output"));
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testNamedEdgeUsingVertexName() {
+ Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+ Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"),
+ dummyTaskCount, dummyTaskResource);
+
+ Edge edge1 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "v1");
+ Edge edge2 = Edge.create(v1, v2, EdgeProperty.create(
+ DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
+ SchedulingType.SEQUENTIAL, OutputDescriptor.create("output"),
+ InputDescriptor.create("input")), "v2");
+
+ DAG dag = DAG.create("testDag").addVertex(v1).addVertex(v2).addEdge(edge1).addEdge(edge2);
+ dag.verify();
+ }
}
diff --git a/tez-api/src/test/java/org/apache/tez/dag/api/TestEdge.java b/tez-api/src/test/java/org/apache/tez/dag/api/TestEdge.java
new file mode 100644
index 0000000..61f4fbc
--- /dev/null
+++ b/tez-api/src/test/java/org/apache/tez/dag/api/TestEdge.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.api;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestEdge {
+ Vertex v1, v2;
+ EdgeProperty edgeProperty;
+ Set<Edge> set;
+
+ @Before
+ public void setup() {
+ v1 = Vertex.create("v1", ProcessorDescriptor.create("Processor"));
+ v2 = Vertex.create("v2", ProcessorDescriptor.create("Processor"));
+ edgeProperty = EdgeProperty.create(EdgeProperty.DataMovementType.SCATTER_GATHER,
+ EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.CONCURRENT,
+ OutputDescriptor.create("output"), InputDescriptor.create("input"));
+ set = new HashSet<>();
+ }
+
+ @Test(timeout = 5000)
+ public void testHashAndEqualsUnnamed() {
+ // edges without name but everything else same are equal and have same hash
+ Edge e1 = Edge.create(v1, v2, edgeProperty);
+ Edge e2 = Edge.create(v1, v2, edgeProperty);
+ assertEquals(e1, e2);
+ set.add(e1);
+ assertTrue(set.contains(e2));
+ }
+
+ @Test(timeout = 5000)
+ public void testHashAndEqualsNamed() {
+ // edges with everything same including name are equal and have same hash
+ Edge e1 = Edge.create(v1, v2, edgeProperty, "e1");
+ Edge e2 = Edge.create(v1, v2, edgeProperty, "e1");
+ assertEquals(e1, e2);
+ set.add(e1);
+ assertTrue(set.contains(e2));
+ }
+
+ @Test(timeout = 5000)
+ public void testHashAndEqualsDifferentName() {
+ // edges with different name but everything else same are not equal and have different hash
+ Edge e1 = Edge.create(v1, v2, edgeProperty, "e1");
+ Edge e2 = Edge.create(v1, v2, edgeProperty, "e2");
+ assertNotEquals(e1, e2);
+ set.add(e1);
+ assertFalse(set.contains(e2));
+ }
+}