[GEARPUMP-349] Optimize Graph topologicalOrderIterator performance
Author: huafengw <fvunicorn@gmail.com>
Closes #223 from huafengw/graph.
diff --git a/core/src/main/scala/org/apache/gearpump/util/Graph.scala b/core/src/main/scala/org/apache/gearpump/util/Graph.scala
index f110f5f..5b48050 100644
--- a/core/src/main/scala/org/apache/gearpump/util/Graph.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/Graph.scala
@@ -17,31 +17,34 @@
*/
package org.apache.gearpump.util
-import scala.annotation.tailrec
+
import scala.collection.mutable
import scala.language.implicitConversions
+import scala.util.{Failure, Success, Try}
/**
* Generic mutable Graph libraries.
*/
class Graph[N, E](vertexList: List[N], edgeList: List[(N, E, N)]) extends Serializable {
-
- private val _vertices = mutable.Set.empty[N]
- private val _edges = mutable.Set.empty[(N, E, N)]
+ private val LOG = LogUtil.getLogger(getClass)
+ private val vertices = mutable.Set.empty[N]
+ private val edges = mutable.Set.empty[(N, E, N)]
+ private val outEdges = mutable.Map.empty[N, mutable.Set[(N, E, N)]]
+ private val inEdges = mutable.Map.empty[N, mutable.Set[(N, E, N)]]
// This is used to ensure the output of this Graph is always stable
- // Like method vertices(), or edges()
- private var _indexs = Map.empty[Any, Int]
- private var _nextIndex = 0
+ // Like method getVertices(), or getEdges()
+ private var indexs = Map.empty[Any, Int]
+ private var nextIndex = 0
private def nextId: Int = {
- val result = _nextIndex
- _nextIndex += 1
+ val result = nextIndex
+ nextIndex += 1
result
}
private def init(): Unit = {
- Option(vertexList).getOrElse(List.empty[N]).foreach(addVertex(_))
- Option(edgeList).getOrElse(List.empty[(N, E, N)]).foreach(addEdge(_))
+ Option(vertexList).getOrElse(List.empty[N]).foreach(addVertex)
+ Option(edgeList).getOrElse(List.empty[(N, E, N)]).foreach(addEdge)
}
init()
@@ -51,20 +54,22 @@
* Current Graph is changed.
*/
def addVertex(vertex: N): Unit = {
- val result = _vertices.add(vertex)
+ val result = vertices.add(vertex)
if (result) {
- _indexs += vertex -> nextId
+ indexs += vertex -> nextId
}
}
/**
- * Add a edge
+ * Add an edge
* Current Graph is changed.
*/
def addEdge(edge: (N, E, N)): Unit = {
- val result = _edges.add(edge)
+ val result = edges.add(edge)
if (result) {
- _indexs += edge -> nextId
+ indexs += edge -> nextId
+ outEdges += edge._1 -> (outgoingEdgesOf(edge._1) + edge)
+ inEdges += edge._3 -> (incomingEdgesOf(edge._3) + edge)
}
}
@@ -72,37 +77,44 @@
* return all vertices.
* The result is stable
*/
- def vertices: List[N] = {
+ def getVertices: List[N] = {
// Sorts the vertex so that we can keep the order for mapVertex
- _vertices.toList.sortBy(_indexs(_))
+ vertices.toList.sortBy(indexs(_))
}
/**
* out degree
*/
def outDegreeOf(node: N): Int = {
- edges.count(_._1 == node)
+ outgoingEdgesOf(node).size
}
/**
* in degree
*/
def inDegreeOf(node: N): Int = {
- edges.count(_._3 == node)
+ incomingEdgesOf(node).size
}
/**
* out going edges.
*/
- def outgoingEdgesOf(node: N): List[(N, E, N)] = {
- edges.filter(_._1 == node)
+ def outgoingEdgesOf(node: N): mutable.Set[(N, E, N)] = {
+ outEdges.getOrElse(node, mutable.Set.empty)
}
/**
* incoming edges.
*/
- def incomingEdgesOf(node: N): List[(N, E, N)] = {
- edges.filter(_._3 == node)
+ def incomingEdgesOf(node: N): mutable.Set[(N, E, N)] = {
+ inEdges.getOrElse(node, mutable.Set.empty)
+ }
+
+ /**
+ * adjacent vertices.
+ */
+ private def adjacentVertices(node: N): List[N] = {
+ outgoingEdgesOf(node).map(_._3).toList
}
/**
@@ -110,10 +122,12 @@
* Current Graph is changed.
*/
def removeVertex(node: N): Unit = {
- _vertices.remove(node)
- _indexs -= node
+ vertices.remove(node)
+ indexs -= node
val toBeRemoved = incomingEdgesOf(node) ++ outgoingEdgesOf(node)
- toBeRemoved.foreach(removeEdge(_))
+ toBeRemoved.foreach(removeEdge)
+ outEdges -= node
+ inEdges -= node
}
/**
@@ -121,8 +135,10 @@
* Current Graph is changed.
*/
private def removeEdge(edge: (N, E, N)): Unit = {
- _indexs -= edge
- _edges.remove(edge)
+ indexs -= edge
+ edges.remove(edge)
+ inEdges.update(edge._3, inEdges(edge._3) - edge)
+ outEdges.update(edge._1, outEdges(edge._1) - edge)
}
/**
@@ -140,14 +156,14 @@
* Current Graph is not changed.
*/
def mapVertex[NewNode](fun: N => NewNode): Graph[NewNode, E] = {
- val vertexes = vertices.map(node => (node, fun(node)))
+ val newVertices = getVertices.map(node => (node, fun(node)))
- val vertexMap: Map[N, NewNode] = vertexes.toMap
+ val vertexMap: Map[N, NewNode] = newVertices.toMap
- val newEdges = edges.map { edge =>
+ val newEdges = getEdges.map { edge =>
(vertexMap(edge._1), edge._2, vertexMap(edge._3))
}
- new Graph(vertexes.map(_._2), newEdges)
+ new Graph(newVertices.map(_._2), newEdges)
}
/**
@@ -155,24 +171,25 @@
* Current graph is not changed.
*/
def mapEdge[NewEdge](fun: (N, E, N) => NewEdge): Graph[N, NewEdge] = {
- val newEdges = edges.map { edge =>
+ val newEdges = getEdges.map { edge =>
(edge._1, fun(edge._1, edge._2, edge._3), edge._3)
}
- new Graph(vertices, newEdges)
+ new Graph(getVertices, newEdges)
}
/**
* edges connected to node
*/
def edgesOf(node: N): List[(N, E, N)] = {
- (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toSet[(N, E, N)].toList.sortBy(_indexs(_))
+ (incomingEdgesOf(node) ++ outgoingEdgesOf(node)).toList
}
/**
* all edges
*/
- def edges: List[(N, E, N)] = {
- _edges.toList.sortBy(_indexs(_))
+ def getEdges: List[(N, E, N)] = {
+ // Sorts the edges so that we can keep the order for mapEdges
+ edges.toList.sortBy(indexs(_))
}
/**
@@ -180,8 +197,8 @@
* Current graph is changed.
*/
def addGraph(other: Graph[N, E]): Graph[N, E] = {
- (vertices ++ other.vertices).foreach(addVertex(_))
- (edges ++ other.edges).foreach(edge => addEdge(edge._1, edge._2, edge._3))
+ (getVertices ++ other.getVertices).foreach(addVertex)
+ (getEdges ++ other.getEdges).foreach(edge => addEdge(edge._1, edge._2, edge._3))
this
}
@@ -189,15 +206,15 @@
* clone the graph
*/
def copy: Graph[N, E] = {
- new Graph(vertices, edges)
+ new Graph(getVertices, getEdges)
}
/**
* check empty
*/
def isEmpty: Boolean = {
- val vertexCount = vertices.size
- val edgeCount = edges.length
+ val vertexCount = getVertices.size
+ val edgeCount = getEdges.length
if (vertexCount + edgeCount == 0) {
true
} else {
@@ -233,8 +250,8 @@
}
private def removeZeroInDegree: List[N] = {
- val toBeRemoved = vertices.filter(inDegreeOf(_) == 0).sortBy(_indexs(_))
- toBeRemoved.foreach(removeVertex(_))
+ val toBeRemoved = getVertices.filter(inDegreeOf(_) == 0)
+ toBeRemoved.foreach(removeVertex)
toBeRemoved
}
@@ -243,13 +260,38 @@
* The node returned by Iterator is stable sorted.
*/
def topologicalOrderIterator: Iterator[N] = {
- val newGraph = copy
- var output = List.empty[N]
-
- while (!newGraph.isEmpty) {
- output ++= newGraph.removeZeroInDegree
+ tryTopologicalOrderIterator match {
+ case Success(iterator) => iterator
+ case Failure(_) =>
+ LOG.warn("Please note this graph is cyclic.")
+ topologicalOrderWithCirclesIterator
}
- output.iterator
+ }
+
+ private def tryTopologicalOrderIterator: Try[Iterator[N]] = {
+ Try {
+ var indegreeMap = getVertices.map(v => v -> inDegreeOf(v)).toMap
+
+ val verticesWithZeroIndegree = mutable.Queue(indegreeMap.filter(_._2 == 0).keys
+ .toList.sortBy(indexs(_)): _*)
+ var output = List.empty[N]
+ var count = 0
+ while (verticesWithZeroIndegree.nonEmpty) {
+ val vertice = verticesWithZeroIndegree.dequeue()
+ adjacentVertices(vertice).foreach { adjacentV =>
+ indegreeMap += adjacentV -> (indegreeMap(adjacentV) - 1)
+ if (indegreeMap(adjacentV) == 0) {
+ verticesWithZeroIndegree.enqueue(adjacentV)
+ }
+ }
+ output :+= vertice
+ count += 1
+ }
+ if (count != getVertices.size) {
+ throw new Exception("There exists a cycle in the graph")
+ }
+ output.iterator
+ }
}
/**
@@ -278,18 +320,18 @@
edge => {
if (!indexMap.contains(edge._3)) {
tarjan(edge._3)
- if (lowLink.get(edge._3).get < lowLink.get(node).get) {
+ if (lowLink(edge._3) < lowLink(node)) {
lowLink(node) = lowLink(edge._3)
}
} else {
- if (inStack.get(edge._3).get && (indexMap.get(edge._3).get < lowLink.get(node).get)) {
+ if (inStack(edge._3) && (indexMap(edge._3) < lowLink(node))) {
lowLink(node) = indexMap(edge._3)
}
}
}
}
- if (indexMap.get(node).get == lowLink.get(node).get) {
+ if (indexMap(node) == lowLink(node)) {
val circle = mutable.MutableList.empty[N]
var n = node
do {
@@ -301,7 +343,7 @@
}
}
- vertices.foreach {
+ getVertices.foreach {
node => {
if (!indexMap.contains(node)) tarjan(node)
}
@@ -318,12 +360,8 @@
* http://www.drdobbs.com/database/topological-sorting/184410262
*/
def topologicalOrderWithCirclesIterator: Iterator[N] = {
- if (hasCycle()) {
- val topo = getAcyclicCopy().topologicalOrderIterator
- topo.flatMap(_.sortBy(_indexs(_)).iterator)
- } else {
- topologicalOrderIterator
- }
+ val topo = getAcyclicCopy().topologicalOrderIterator
+ topo.flatMap(_.sortBy(indexs(_)).iterator)
}
private def getAcyclicCopy(): Graph[mutable.MutableList[N], E] = {
@@ -337,13 +375,13 @@
for (circle1 <- circles; circle2 <- circles; if circle1 != circle2) yield {
for (node1 <- circle1; node2 <- circle2) yield {
- var edges = outgoingEdgesOf(node1)
- for (edge <- edges; if edge._3 == node2) yield {
+ var outgoingEdges = outgoingEdgesOf(node1)
+ for (edge <- outgoingEdges; if edge._3 == node2) yield {
newGraph.addEdge(circle1, edge._2, circle2)
}
- edges = outgoingEdgesOf(node2)
- for (edge <- edges; if edge._3 == node1) yield {
+ outgoingEdges = outgoingEdgesOf(node2)
+ for (edge <- outgoingEdges; if edge._3 == node1) yield {
newGraph.addEdge(circle2, edge._2, circle1)
}
}
@@ -355,26 +393,14 @@
* check whether there is a loop
*/
def hasCycle(): Boolean = {
- @tailrec
- def detectCycle(graph: Graph[N, E]): Boolean = {
- if (graph.edges.isEmpty) {
- false
- } else if (graph.vertices.nonEmpty && !graph.vertices.exists(graph.inDegreeOf(_) == 0)) {
- true
- } else {
- graph.removeZeroInDegree
- detectCycle(graph)
- }
- }
-
- detectCycle(copy)
+ tryTopologicalOrderIterator.isFailure
}
/**
* Check whether there are two edges connecting two nodes.
*/
def hasDuplicatedEdge(): Boolean = {
- edges.groupBy(edge => (edge._1, edge._3)).values.exists(_.size > 1)
+ getEdges.groupBy(edge => (edge._1, edge._3)).values.exists(_.size > 1)
}
/**
@@ -391,7 +417,7 @@
val toBeRemovedLists = newGraph.removeZeroInDegree
val maxLength = toBeRemovedLists.map(_.length).max
for (subGraph <- toBeRemovedLists) {
- val sorted = subGraph.sortBy(_indexs)
+ val sorted = subGraph.sortBy(indexs)
for (i <- sorted.indices) {
output += sorted(i) -> (level + i)
}
@@ -402,8 +428,8 @@
}
override def toString: String = {
- Map("vertices" -> vertices.mkString(","),
- "edges" -> edges.mkString(",")).toString()
+ Map("vertices" -> getVertices.mkString(","),
+ "edges" -> getEdges.mkString(",")).toString()
}
}
@@ -436,7 +462,7 @@
}
def unapply[N, E](graph: Graph[N, E]): Option[(List[N], List[(N, E, N)])] = {
- Some((graph.vertices, graph.edges))
+ Some((graph.getVertices, graph.getEdges))
}
def empty[N, E]: Graph[N, E] = {
diff --git a/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala b/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala
index 6663513..256ac9a 100644
--- a/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/util/GraphSpec.scala
@@ -34,7 +34,7 @@
property("Graph with no edges should be built correctly") {
val vertexSet = Set("A", "B", "C")
val graph = Graph(vertexSet.toSeq.map(Node): _*)
- graph.vertices.toSet shouldBe vertexSet
+ graph.getVertices.toSet shouldBe vertexSet
}
property("Graph with vertices and edges should be built correctly") {
@@ -67,7 +67,7 @@
}
val graph: Graph[Vertex, Edge] = Graph(graphElements: _*)
- graph.vertices should contain theSameElementsAs vertices
+ graph.getVertices should contain theSameElementsAs vertices
0.until(vertices.size).foreach { i =>
val v = vertices(i)
@@ -129,7 +129,7 @@
val newGraph = graph.copy
newGraph.addVertex("C")
- assert(!graph.vertices.toSet.contains("C"), "Graph should be immutable")
+ assert(!graph.getVertices.toSet.contains("C"), "Graph should be immutable")
}
property("subGraph should return a sub-graph for certain vertex") {
diff --git a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
index e033bf1..2f0bbf2 100644
--- a/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
+++ b/examples/pagerank/src/main/scala/org/apache/gearpump/experiments/pagerank/PageRankWorker.scala
@@ -37,13 +37,13 @@
private val graph = conf.getValue[Graph[NodeWithTaskId[_], AnyRef]](PageRankApplication.DAG).get
- private val node = graph.vertices.find { node =>
+ private val node = graph.getVertices.find { node =>
node.taskId == taskContext.taskId.index
}.get
private val downstream = graph.outgoingEdgesOf(node).map(_._3)
.map(id => taskId.copy(index = id.taskId)).toSeq
- private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).length
+ private val upstreamCount = graph.incomingEdgesOf(node).map(_._1).size
LOG.info(s"downstream nodes: $downstream")
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
index d764331..ed5a10d 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/GraphPartitioner.scala
@@ -69,7 +69,7 @@
val local = Graph.empty[Module, Edge]
val remote = Graph.empty[Module, Edge]
- graph.vertices.foreach{ module =>
+ graph.getVertices.foreach{ module =>
if (tags(module) == Local) {
local.addVertex(module)
} else {
@@ -77,7 +77,7 @@
}
}
- graph.edges.foreach{ nodeEdgeNode =>
+ graph.getEdges.foreach{ nodeEdgeNode =>
val (node1, edge, node2) = nodeEdgeNode
(tags(node1), tags(node2)) match {
case (Local, Local) =>
@@ -115,14 +115,14 @@
}
private def tag(graph: Graph[Module, Edge], strategy: Strategy): Map[Module, Location] = {
- graph.vertices.map{vertex =>
+ graph.getVertices.map{ vertex =>
vertex -> strategy.apply(vertex)
}.toMap
}
private def removeDummyModule(inputGraph: Graph[Module, Edge]): Graph[Module, Edge] = {
val graph = inputGraph.copy
- val dummies = graph.vertices.filter {module =>
+ val dummies = graph.getVertices.filter { module =>
module match {
case dummy: DummyModule =>
true
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
index 477f4d3..7d07ca8 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/LocalMaterializerImpl.scala
@@ -210,10 +210,10 @@
def buildToplevelModule(graph: GGraph[Module, Edge]): Module = {
var moduleInProgress: Module = EmptyModule
- graph.vertices.foreach(module => {
+ graph.getVertices.foreach(module => {
moduleInProgress = moduleInProgress.compose(module)
})
- graph.edges.foreach(value => {
+ graph.getEdges.foreach(value => {
val (node1, edge, node2) = value
moduleInProgress = moduleInProgress.wire(edge.from, edge.to)
})
@@ -232,7 +232,7 @@
session.materializeAtomic(module.asInstanceOf[AtomicModule], module.attributes, matV)
matV.get(module)
}
- materializedGraph.edges.foreach { nodeEdgeNode =>
+ materializedGraph.getEdges.foreach { nodeEdgeNode =>
val (node1, edge, node2) = nodeEdgeNode
val from = edge.from
val to = edge.to
@@ -248,7 +248,7 @@
case _ =>
}
}
- val matValSources = graph.vertices.flatMap(module => {
+ val matValSources = graph.getVertices.flatMap(module => {
val rt: Option[MaterializedValueSource[_]] = module match {
case graphStage: GraphStageModule =>
graphStage.stage match {
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
index a62b8e3..a2a5185 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -86,7 +86,7 @@
private def junctionConfig(processorIds: Map[Module, ProcessorId]):
Map[ProcessorId, UserConfig] = {
- val updatedConfigs = graph.vertices.flatMap { vertex =>
+ val updatedConfigs = graph.getVertices.flatMap { vertex =>
buildShape(vertex, processorIds)
}.toMap
updatedConfigs
@@ -119,7 +119,7 @@
Map[Module, ProcessorId] = {
ids.flatMap { kv =>
val (module, id) = kv
- val processorId = app.dag.vertices.find { processor =>
+ val processorId = app.dag.getVertices.find { processor =>
processor.taskConf.getString(id).isDefined
}.map(_.id)
processorId.map((module, _))
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
index 9cf5009..fb89268 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/GraphBuilderSpec.scala
@@ -45,8 +45,8 @@
val graph = GraphBuilder.build(topology)
- graph.edges.size shouldBe 1
- val (from, edge, to) = graph.edges.head
+ graph.getEdges.size shouldBe 1
+ val (from, edge, to) = graph.getEdges.head
from shouldBe sourceProcessor
edge shouldBe a[StormPartitioner]
to shouldBe targetProcessor
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala
index 2bf6b6b..c5b3990 100644
--- a/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/util/UpickleSpec.scala
@@ -43,8 +43,8 @@
val deserialized = read[Graph[Int, String]](serialized)
- graph.vertices.toSet shouldBe deserialized.vertices.toSet
- graph.edges.toSet shouldBe deserialized.edges.toSet
+ graph.getVertices.toSet shouldBe deserialized.getVertices.toSet
+ graph.getEdges.toSet shouldBe deserialized.getEdges.toSet
}
"MetricType" should "be able to serialize/deserialize correctly" in {
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
index 8ad74f8..c43af2f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/DAG.scala
@@ -49,7 +49,7 @@
object DAG {
def apply(graph: Graph[ProcessorDescription, PartitionerDescription], version: Int = 0): DAG = {
- val processors = graph.vertices.map { processorDescription =>
+ val processors = graph.getVertices.map { processorDescription =>
(processorDescription.id, processorDescription)
}.toMap
val dag = graph.mapVertex { processor =>
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index f15e1b3..60a3897 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -150,11 +150,7 @@
name: String, dag: Graph[T, P], userConfig: UserConfig): StreamApplication = {
import org.apache.gearpump.streaming.Processor._
- if (dag.hasCycle()) {
- LOG.warn(s"Detected cycles in DAG of application $name!")
- }
-
- val indices = dag.topologicalOrderWithCirclesIterator.toList.zipWithIndex.toMap
+ val indices = dag.topologicalOrderIterator.toList.zipWithIndex.toMap
val graph = dag.mapVertex { processor =>
val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor)
updatedProcessor
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index 90141d4..57602c5 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -397,7 +397,7 @@
}
if (isClockStalling) {
- val processorId = dag.graph.topologicalOrderWithCirclesIterator.toList.find { processorId =>
+ val processorId = dag.graph.topologicalOrderIterator.toList.find { processorId =>
val clock = processorClocks.get(processorId)
if (clock.isDefined) {
clock.get.min == minClock.appClock
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index 04b5337..77083f0 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -67,7 +67,7 @@
private def optimize(dag: Graph[Op, OpEdge])
(implicit system: ActorSystem): Graph[Op, OpEdge] = {
val graph = dag.copy
- val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse
+ val nodes = graph.topologicalOrderIterator.toList.reverse
for (node <- nodes) {
val outGoingEdges = graph.outgoingEdgesOf(node)
for (edge <- outGoingEdges) {
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
index ccda8f0..f9e2efd 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
@@ -38,7 +38,7 @@
dag.processors.size shouldBe 1
assert(dag.taskCount == parallelism)
dag.tasks.sortBy(_.index) shouldBe (0 until parallelism).map(index => TaskId(0, index))
- dag.graph.edges shouldBe empty
+ dag.graph.getEdges shouldBe empty
}
}
}
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index 70d21b5..be4cc63 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -86,7 +86,7 @@
val plan = planner.plan(graph)
.mapVertex(_.description)
- plan.vertices.toSet should contain theSameElementsAs
+ plan.getVertices.toSet should contain theSameElementsAs
Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", "processor", "sink")
plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe
a[GroupByPartitioner[_, _]]
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
index c8c8b9f..d43bca0 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
@@ -58,8 +58,8 @@
application.name shouldBe "dsl"
val dag = application.userConfig
.getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get
- dag.vertices.size shouldBe 2
- dag.vertices.foreach { processor =>
+ dag.getVertices.size shouldBe 2
+ dag.getVertices.foreach { processor =>
processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
if (processor.description == "A.globalWindows") {
processor.parallelism shouldBe 2
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
index ef8f932..0b8abcd 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -85,8 +85,10 @@
}
val expectedDagTopology = getExpectedDagTopology
- dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet
- dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet
+ dagTopology.getVertices.toSet should
+ contain theSameElementsAs expectedDagTopology.getVertices.toSet
+ dagTopology.getEdges.toSet should
+ contain theSameElementsAs expectedDagTopology.getEdges.toSet
}
private def getExpectedDagTopology: Graph[String, String] = {