blob: 221484a0958d55b4139b3836323908454b1499ef [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.execution;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl;
import org.apache.samza.config.MapConfig;
import org.apache.samza.system.StreamSpec;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
public class TestJobGraph {
JobGraph graph1;
JobGraph graph2;
JobGraph graph3;
JobGraph graph4;
int streamSeq = 0;
private StreamSpec genStream() {
++streamSeq;
return new StreamSpec(String.valueOf(streamSeq), "test-stream", "test-system");
}
/**
* graph1 is the example graph from wikipedia
*
* 5 7 3
* | / | / |
* v v |
* 11 8 |
* | \X /
* v v \v
* 2 9 10
*/
private void createGraph1() {
StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
graph1 = new JobGraph(new MapConfig(), appDesc);
JobNode n2 = graph1.getOrCreateJobNode("2", "1");
JobNode n3 = graph1.getOrCreateJobNode("3", "1");
JobNode n5 = graph1.getOrCreateJobNode("5", "1");
JobNode n7 = graph1.getOrCreateJobNode("7", "1");
JobNode n8 = graph1.getOrCreateJobNode("8", "1");
JobNode n9 = graph1.getOrCreateJobNode("9", "1");
JobNode n10 = graph1.getOrCreateJobNode("10", "1");
JobNode n11 = graph1.getOrCreateJobNode("11", "1");
graph1.addInputStream(genStream(), n5);
graph1.addInputStream(genStream(), n7);
graph1.addInputStream(genStream(), n3);
graph1.addIntermediateStream(genStream(), n5, n11);
graph1.addIntermediateStream(genStream(), n7, n11);
graph1.addIntermediateStream(genStream(), n7, n8);
graph1.addIntermediateStream(genStream(), n3, n8);
graph1.addIntermediateStream(genStream(), n11, n2);
graph1.addIntermediateStream(genStream(), n11, n9);
graph1.addIntermediateStream(genStream(), n8, n9);
graph1.addIntermediateStream(genStream(), n11, n10);
graph1.addOutputStream(genStream(), n2);
graph1.addOutputStream(genStream(), n9);
graph1.addOutputStream(genStream(), n10);
}
/**
* graph2 is a graph with a loop
* 1 -> 2 -> 3 -> 4 -> 5 -> 7
* |<---6 <--| <>
*/
private void createGraph2() {
StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
graph2 = new JobGraph(new MapConfig(), appDesc);
JobNode n1 = graph2.getOrCreateJobNode("1", "1");
JobNode n2 = graph2.getOrCreateJobNode("2", "1");
JobNode n3 = graph2.getOrCreateJobNode("3", "1");
JobNode n4 = graph2.getOrCreateJobNode("4", "1");
JobNode n5 = graph2.getOrCreateJobNode("5", "1");
JobNode n6 = graph2.getOrCreateJobNode("6", "1");
JobNode n7 = graph2.getOrCreateJobNode("7", "1");
graph2.addInputStream(genStream(), n1);
graph2.addIntermediateStream(genStream(), n1, n2);
graph2.addIntermediateStream(genStream(), n2, n3);
graph2.addIntermediateStream(genStream(), n3, n4);
graph2.addIntermediateStream(genStream(), n4, n5);
graph2.addIntermediateStream(genStream(), n4, n6);
graph2.addIntermediateStream(genStream(), n6, n2);
graph2.addIntermediateStream(genStream(), n5, n5);
graph2.addIntermediateStream(genStream(), n5, n7);
graph2.addOutputStream(genStream(), n7);
}
/**
* graph3 is a graph with two self loops
* 1<->1 -> 2<->2
*/
private void createGraph3() {
StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
graph3 = new JobGraph(new MapConfig(), appDesc);
JobNode n1 = graph3.getOrCreateJobNode("1", "1");
JobNode n2 = graph3.getOrCreateJobNode("2", "1");
graph3.addInputStream(genStream(), n1);
graph3.addIntermediateStream(genStream(), n1, n1);
graph3.addIntermediateStream(genStream(), n1, n2);
graph3.addIntermediateStream(genStream(), n2, n2);
}
/**
* graph4 is a graph of single-loop node
* 1<->1
*/
private void createGraph4() {
StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
graph4 = new JobGraph(new MapConfig(), appDesc);
JobNode n1 = graph4.getOrCreateJobNode("1", "1");
graph4.addInputStream(genStream(), n1);
graph4.addIntermediateStream(genStream(), n1, n1);
}
@Before
public void setup() {
createGraph1();
createGraph2();
createGraph3();
createGraph4();
}
@Test
public void testAddSource() {
StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
JobGraph graph = new JobGraph(new MapConfig(), appDesc);
/**
* s1 -> 1
* s2 ->|
*
* s3 -> 2
* |-> 3
*/
JobNode n1 = graph.getOrCreateJobNode("1", "1");
JobNode n2 = graph.getOrCreateJobNode("2", "1");
JobNode n3 = graph.getOrCreateJobNode("3", "1");
StreamSpec s1 = genStream();
StreamSpec s2 = genStream();
StreamSpec s3 = genStream();
graph.addInputStream(s1, n1);
graph.addInputStream(s2, n1);
graph.addInputStream(s3, n2);
graph.addInputStream(s3, n3);
assertTrue(graph.getInputStreams().size() == 3);
assertTrue(graph.getOrCreateJobNode("1", "1").getInEdges().size() == 2);
assertTrue(graph.getOrCreateJobNode("2", "1").getInEdges().size() == 1);
assertTrue(graph.getOrCreateJobNode("3", "1").getInEdges().size() == 1);
assertTrue(graph.getOrCreateStreamEdge(s1).getSourceNodes().size() == 0);
assertTrue(graph.getOrCreateStreamEdge(s1).getTargetNodes().size() == 1);
assertTrue(graph.getOrCreateStreamEdge(s2).getSourceNodes().size() == 0);
assertTrue(graph.getOrCreateStreamEdge(s2).getTargetNodes().size() == 1);
assertTrue(graph.getOrCreateStreamEdge(s3).getSourceNodes().size() == 0);
assertTrue(graph.getOrCreateStreamEdge(s3).getTargetNodes().size() == 2);
}
@Test
public void testAddSink() {
/**
* 1 -> s1
* 2 -> s2
* 2 -> s3
*/
StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
JobGraph graph = new JobGraph(new MapConfig(), appDesc);
JobNode n1 = graph.getOrCreateJobNode("1", "1");
JobNode n2 = graph.getOrCreateJobNode("2", "1");
StreamSpec s1 = genStream();
StreamSpec s2 = genStream();
StreamSpec s3 = genStream();
graph.addOutputStream(s1, n1);
graph.addOutputStream(s2, n2);
graph.addOutputStream(s3, n2);
assertTrue(graph.getOutputStreams().size() == 3);
assertTrue(graph.getOrCreateJobNode("1", "1").getOutEdges().size() == 1);
assertTrue(graph.getOrCreateJobNode("2", "1").getOutEdges().size() == 2);
assertTrue(graph.getOrCreateStreamEdge(s1).getSourceNodes().size() == 1);
assertTrue(graph.getOrCreateStreamEdge(s1).getTargetNodes().size() == 0);
assertTrue(graph.getOrCreateStreamEdge(s2).getSourceNodes().size() == 1);
assertTrue(graph.getOrCreateStreamEdge(s2).getTargetNodes().size() == 0);
assertTrue(graph.getOrCreateStreamEdge(s3).getSourceNodes().size() == 1);
assertTrue(graph.getOrCreateStreamEdge(s3).getTargetNodes().size() == 0);
}
@Test
public void testReachable() {
Set<JobNode> reachable1 = graph1.findReachable();
assertTrue(reachable1.size() == 8);
Set<JobNode> reachable2 = graph2.findReachable();
assertTrue(reachable2.size() == 7);
}
@Test
public void testTopologicalSort() {
// test graph1
List<JobNode> sortedNodes1 = graph1.topologicalSort();
Map<String, Integer> idxMap1 = new HashMap<>();
for (int i = 0; i < sortedNodes1.size(); i++) {
idxMap1.put(sortedNodes1.get(i).getJobName(), i);
}
assertTrue(idxMap1.size() == 8);
assertTrue(idxMap1.get("11") > idxMap1.get("5"));
assertTrue(idxMap1.get("11") > idxMap1.get("7"));
assertTrue(idxMap1.get("8") > idxMap1.get("7"));
assertTrue(idxMap1.get("8") > idxMap1.get("3"));
assertTrue(idxMap1.get("2") > idxMap1.get("11"));
assertTrue(idxMap1.get("9") > idxMap1.get("8"));
assertTrue(idxMap1.get("9") > idxMap1.get("11"));
assertTrue(idxMap1.get("10") > idxMap1.get("11"));
assertTrue(idxMap1.get("10") > idxMap1.get("3"));
// test graph2
List<JobNode> sortedNodes2 = graph2.topologicalSort();
Map<String, Integer> idxMap2 = new HashMap<>();
for (int i = 0; i < sortedNodes2.size(); i++) {
idxMap2.put(sortedNodes2.get(i).getJobName(), i);
}
assertTrue(idxMap2.size() == 7);
assertTrue(idxMap2.get("2") > idxMap2.get("1"));
assertTrue(idxMap2.get("3") > idxMap2.get("1"));
assertTrue(idxMap2.get("4") > idxMap2.get("1"));
assertTrue(idxMap2.get("6") > idxMap2.get("1"));
assertTrue(idxMap2.get("5") > idxMap2.get("4"));
assertTrue(idxMap2.get("7") > idxMap2.get("5"));
//test graph3
List<JobNode> sortedNodes3 = graph3.topologicalSort();
assertTrue(sortedNodes3.size() == 2);
assertEquals(sortedNodes3.get(0).getJobName(), "1");
assertEquals(sortedNodes3.get(1).getJobName(), "2");
//test graph4
List<JobNode> sortedNodes4 = graph4.topologicalSort();
assertTrue(sortedNodes4.size() == 1);
assertEquals(sortedNodes4.get(0).getJobName(), "1");
}
}