| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.dag.api; |
| |
| import org.apache.hadoop.yarn.api.records.Resource; |
| import org.apache.tez.dag.api.EdgeProperty.DataMovementType; |
| import org.apache.tez.dag.api.EdgeProperty.DataSourceType; |
| import org.apache.tez.dag.api.EdgeProperty.SchedulingType; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| public class TestDAGVerify { |
| |
| private final String dummyProcessorClassName = TestDAGVerify.class.getName(); |
| private final String dummyInputClassName = TestDAGVerify.class.getName(); |
| private final String dummyOutputClassName = TestDAGVerify.class.getName(); |
| private final int dummyTaskCount = 2; |
| private final Resource dummyTaskResource = Resource.newInstance(1, 1); |
| |
| // v1 |
| // | |
| // v2 |
| @Test |
| public void testVerifyScatterGather() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor(dummyProcessorClassName), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Edge e1 = new Edge(v1, v2, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor(dummyOutputClassName), |
| new InputDescriptor(dummyInputClassName))); |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addEdge(e1); |
| dag.verify(); |
| } |
| |
| @Test |
| public void testVerifyCustomEdge() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor(dummyProcessorClassName), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Edge e1 = new Edge(v1, v2, |
| new EdgeProperty(new EdgeManagerDescriptor("emClass"), |
| DataSourceType.PERSISTED, |
| SchedulingType.SEQUENTIAL, |
| new OutputDescriptor(dummyOutputClassName), |
| new InputDescriptor(dummyInputClassName))); |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addEdge(e1); |
| dag.verify(); |
| } |
| |
| @Test |
| public void testVerifyOneToOne() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor(dummyProcessorClassName), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Edge e1 = new Edge(v1, v2, |
| new EdgeProperty(DataMovementType.ONE_TO_ONE, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor(dummyOutputClassName), |
| new InputDescriptor(dummyInputClassName))); |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addEdge(e1); |
| dag.verify(); |
| } |
| |
| @Test |
| public void testVerifyBroadcast() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor(dummyProcessorClassName), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Edge e1 = new Edge(v1, v2, |
| new EdgeProperty(DataMovementType.BROADCAST, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor(dummyOutputClassName), |
| new InputDescriptor(dummyInputClassName))); |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addEdge(e1); |
| dag.verify(); |
| } |
| |
| @Test(expected = IllegalStateException.class) |
| public void testVerify3() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor(dummyProcessorClassName), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Edge e1 = new Edge(v1, v2, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.EPHEMERAL, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor(dummyOutputClassName), |
| new InputDescriptor(dummyInputClassName))); |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addEdge(e1); |
| dag.verify(); |
| } |
| |
| @Test(expected = IllegalStateException.class) |
| public void testVerify4() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor(dummyProcessorClassName), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Edge e1 = new Edge(v1, v2, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.EPHEMERAL, SchedulingType.CONCURRENT, |
| new OutputDescriptor(dummyOutputClassName), |
| new InputDescriptor(dummyInputClassName))); |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addEdge(e1); |
| dag.verify(); |
| } |
| |
| // v1 <---- |
| // | ^ |
| // v2 ^ |
| // | | ^ |
| // v3 v4 |
| @Test |
| public void testCycle1() { |
| IllegalStateException ex=null; |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v3 = new Vertex("v3", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v4 = new Vertex("v4", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Edge e1 = new Edge(v1, v2, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| Edge e2 = new Edge(v2, v3, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| Edge e3 = new Edge(v2, v4, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| Edge e4 = new Edge(v4, v1, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addVertex(v3); |
| dag.addVertex(v4); |
| dag.addEdge(e1); |
| dag.addEdge(e2); |
| dag.addEdge(e3); |
| dag.addEdge(e4); |
| try{ |
| dag.verify(); |
| } |
| catch (IllegalStateException e){ |
| ex = e; |
| } |
| Assert.assertNotNull(ex); |
| System.out.println(ex.getMessage()); |
| Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle")); |
| } |
| |
| // v1 |
| // | |
| // -> v2 |
| // ^ | | |
| // v3 v4 |
| @Test |
| public void testCycle2() { |
| IllegalStateException ex=null; |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v3 = new Vertex("v3", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v4 = new Vertex("v4", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Edge e1 = new Edge(v1, v2, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| Edge e2 = new Edge(v2, v3, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| Edge e3 = new Edge(v2, v4, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| Edge e4 = new Edge(v3, v2, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addVertex(v3); |
| dag.addVertex(v4); |
| dag.addEdge(e1); |
| dag.addEdge(e2); |
| dag.addEdge(e3); |
| dag.addEdge(e4); |
| try{ |
| dag.verify(); |
| } |
| catch (IllegalStateException e){ |
| ex = e; |
| } |
| Assert.assertNotNull(ex); |
| System.out.println(ex.getMessage()); |
| Assert.assertTrue(ex.getMessage().startsWith("DAG contains a cycle")); |
| } |
| |
| @Test |
| public void repeatedVertexName() { |
| IllegalStateException ex=null; |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v1repeat = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| try { |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v1repeat); |
| dag.verify(); |
| } |
| catch (IllegalStateException e){ |
| ex = e; |
| } |
| Assert.assertNotNull(ex); |
| System.out.println(ex.getMessage()); |
| Assert.assertTrue(ex.getMessage().startsWith("Vertex v1 already defined")); |
| } |
| |
| @Test (expected = IllegalStateException.class) |
| public void testInputAndInputVertexNameCollision() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| |
| v2.addInput("v1", new InputDescriptor(), null); |
| |
| Edge e1 = new Edge(v1, v2, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addEdge(e1); |
| dag.verify(); |
| } |
| |
| @Test (expected = IllegalStateException.class) |
| public void testOutputAndOutputVertexNameCollision() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| |
| v1.addOutput("v2", new OutputDescriptor()); |
| |
| Edge e1 = new Edge(v1, v2, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addEdge(e1); |
| dag.verify(); |
| } |
| |
| @Test (expected = IllegalStateException.class) |
| public void testOutputAndVertexNameCollision() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| |
| v1.addOutput("v2", new OutputDescriptor()); |
| |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.verify(); |
| } |
| |
| @Test (expected = IllegalStateException.class) |
| public void testInputAndVertexNameCollision() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| |
| v1.addInput("v2", new InputDescriptor(), null); |
| |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.verify(); |
| } |
| |
| // v1 v2 |
| // | | |
| // v3 |
| @Test |
| public void BinaryInputAllowed() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v3 = new Vertex("v3", |
| new ProcessorDescriptor("ReduceProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Edge e1 = new Edge(v1, v3, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| Edge e2 = new Edge(v2, v3, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addVertex(v3); |
| dag.addEdge(e1); |
| dag.addEdge(e2); |
| dag.verify(); |
| } |
| |
| @Test |
| public void testVertexGroupWithMultipleOutputEdges() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("Processor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("Processor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v3 = new Vertex("v3", |
| new ProcessorDescriptor("Processor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v4 = new Vertex("v4", |
| new ProcessorDescriptor("Processor"), |
| dummyTaskCount, dummyTaskResource); |
| |
| DAG dag = new DAG("testDag"); |
| VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2); |
| OutputDescriptor outDesc = new OutputDescriptor(); |
| uv12.addOutput("uvOut", outDesc, null); |
| |
| GroupInputEdge e1 = new GroupInputEdge(uv12, v3, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class")), |
| new InputDescriptor("dummy input class")); |
| |
| GroupInputEdge e2 = new GroupInputEdge(uv12, v4, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class")), |
| new InputDescriptor("dummy input class")); |
| |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addVertex(v3); |
| dag.addVertex(v4); |
| dag.addEdge(e1); |
| dag.addEdge(e2); |
| dag.verify(); |
| |
| Assert.assertEquals(2, v1.getOutputVertices().size()); |
| Assert.assertEquals(2, v2.getOutputVertices().size()); |
| Assert.assertTrue(v1.getOutputVertices().contains(v3)); |
| Assert.assertTrue(v1.getOutputVertices().contains(v4)); |
| Assert.assertTrue(v2.getOutputVertices().contains(v3)); |
| Assert.assertTrue(v2.getOutputVertices().contains(v4)); |
| } |
| |
| @Test |
| public void testVertexGroup() { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("Processor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("Processor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v3 = new Vertex("v3", |
| new ProcessorDescriptor("Processor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v4 = new Vertex("v4", |
| new ProcessorDescriptor("Processor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v5 = new Vertex("v5", |
| new ProcessorDescriptor("Processor"), |
| dummyTaskCount, dummyTaskResource); |
| |
| DAG dag = new DAG("testDag"); |
| String groupName1 = "uv12"; |
| VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2); |
| OutputDescriptor outDesc = new OutputDescriptor(); |
| uv12.addOutput("uvOut", outDesc, null); |
| |
| String groupName2 = "uv23"; |
| VertexGroup uv23 = dag.createVertexGroup(groupName2, v2, v3); |
| |
| GroupInputEdge e1 = new GroupInputEdge(uv12, v4, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class")), |
| new InputDescriptor("dummy input class")); |
| GroupInputEdge e2 = new GroupInputEdge(uv23, v5, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class")), |
| new InputDescriptor("dummy input class")); |
| |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addVertex(v3); |
| dag.addVertex(v4); |
| dag.addVertex(v5); |
| dag.addEdge(e1); |
| dag.addEdge(e2); |
| dag.verify(); |
| |
| // for the first Group v1 and v2 should get connected to v4 and also have 1 output |
| // for the second Group v2 and v3 should get connected to v5 |
| // the Group place holders should disappear |
| Assert.assertNull(dag.getVertex(uv12.getGroupName())); |
| Assert.assertNull(dag.getVertex(uv23.getGroupName())); |
| Assert.assertFalse(dag.edges.contains(e1)); |
| Assert.assertFalse(dag.edges.contains(e2)); |
| Assert.assertEquals(1, v1.getOutputs().size()); |
| Assert.assertEquals(1, v2.getOutputs().size()); |
| Assert.assertEquals(outDesc, v1.getOutputs().get(0).getDescriptor()); |
| Assert.assertEquals(outDesc, v2.getOutputs().get(0).getDescriptor()); |
| Assert.assertEquals(1, v1.getOutputVertices().size()); |
| Assert.assertEquals(1, v3.getOutputVertices().size()); |
| Assert.assertEquals(2, v2.getOutputVertices().size()); |
| Assert.assertTrue(v1.getOutputVertices().contains(v4)); |
| Assert.assertTrue(v3.getOutputVertices().contains(v5)); |
| Assert.assertTrue(v2.getOutputVertices().contains(v4)); |
| Assert.assertTrue(v2.getOutputVertices().contains(v5)); |
| Assert.assertEquals(2, v4.getInputVertices().size()); |
| Assert.assertTrue(v4.getInputVertices().contains(v1)); |
| Assert.assertTrue(v4.getInputVertices().contains(v2)); |
| Assert.assertEquals(2, v5.getInputVertices().size()); |
| Assert.assertTrue(v5.getInputVertices().contains(v2)); |
| Assert.assertTrue(v5.getInputVertices().contains(v3)); |
| Assert.assertEquals(1, v4.getGroupInputs().size()); |
| Assert.assertTrue(v4.getGroupInputs().containsKey(groupName1)); |
| Assert.assertEquals(1, v5.getGroupInputs().size()); |
| Assert.assertTrue(v5.getGroupInputs().containsKey(groupName2)); |
| Assert.assertEquals(2, dag.vertexGroups.size()); |
| } |
| |
| // v1 |
| // | | |
| // v2 v3 |
| @Test |
| public void BinaryOutput() { |
| IllegalStateException ex = null; |
| try { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v2 = new Vertex("v2", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Vertex v3 = new Vertex("v3", |
| new ProcessorDescriptor("MapProcessor"), |
| dummyTaskCount, dummyTaskResource); |
| Edge e1 = new Edge(v1, v2, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| Edge e2 = new Edge(v1, v2, |
| new EdgeProperty(DataMovementType.SCATTER_GATHER, |
| DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, |
| new OutputDescriptor("dummy output class"), |
| new InputDescriptor("dummy input class"))); |
| DAG dag = new DAG("testDag"); |
| dag.addVertex(v1); |
| dag.addVertex(v2); |
| dag.addVertex(v3); |
| dag.addEdge(e1); |
| dag.addEdge(e2); |
| dag.verify(); |
| } |
| catch (IllegalStateException e){ |
| ex = e; |
| } |
| Assert.assertNull(ex); |
| } |
| |
| @Test |
| public void testDagWithNoVertices() { |
| IllegalStateException ex=null; |
| try { |
| DAG dag = new DAG("testDag"); |
| dag.verify(); |
| } |
| catch (IllegalStateException e){ |
| ex = e; |
| } |
| Assert.assertNotNull(ex); |
| System.out.println(ex.getMessage()); |
| Assert.assertTrue(ex.getMessage() |
| .startsWith("Invalid dag containing 0 vertices")); |
| } |
| |
| @SuppressWarnings("unused") |
| @Test |
| public void testInvalidVertexConstruction() { |
| { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| 0, dummyTaskResource); |
| Vertex v2 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| -1, dummyTaskResource); |
| } |
| try { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| -2, dummyTaskResource); |
| Assert.fail("Expected exception for 0 parallelism"); |
| } catch (IllegalArgumentException e) { |
| Assert |
| .assertTrue(e |
| .getMessage() |
| .startsWith( |
| "Parallelism should be -1 if determined by the AM, otherwise should be >= 0")); |
| } |
| try { |
| Vertex v1 = new Vertex("v1", |
| new ProcessorDescriptor("MapProcessor"), |
| 1, null); |
| Assert.fail("Expected exception for 0 parallelism"); |
| } catch (IllegalArgumentException e) { |
| Assert.assertTrue(e.getMessage().startsWith("Resource cannot be null")); |
| } |
| } |
| |
| } |