Replace graph code in tracker (#3579)
diff --git a/heron/tools/tracker/src/python/BUILD b/heron/tools/tracker/src/python/BUILD
index 2f6d487..7b06a78 100644
--- a/heron/tools/tracker/src/python/BUILD
+++ b/heron/tools/tracker/src/python/BUILD
@@ -10,6 +10,7 @@
"protobuf==3.8.0",
"tornado==4.0.2",
"javaobj-py3==0.4.1",
+ "networkx==2.4",
],
deps = [
"//heron/common/src/python:common-py",
diff --git a/heron/tools/tracker/src/python/graph.py b/heron/tools/tracker/src/python/graph.py
deleted file mode 100644
index b73f084..0000000
--- a/heron/tools/tracker/src/python/graph.py
+++ /dev/null
@@ -1,133 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-''' graph.py '''
-
-
-################################################################################
-class Graph:
- '''
- Adjacency list of edges in graph. This will correspond to the streams in a topology DAG.
- '''
-
- def __init__(self):
- # graph adjecency list. Implemented as a map with key = vertices and
- # values = set of Vertices in adjacency list.
- self.edges = {}
-
- # pylint: disable=invalid-name
- def add_edge(self, U, V):
- '''
- :param U:
- :param V:
- :return:
- '''
- if not U in self.edges:
- self.edges[U] = set()
- if not V in self.edges:
- self.edges[V] = set()
- if not V in self.edges[U]:
- self.edges[U].add(V)
-
- def __str__(self):
- return str(self.edges)
-
- def bfs_depth(self, U):
- '''
- Returns the maximum distance between any vertex and U in the connected
- component containing U
- :param U:
- :return:
- '''
- bfs_queue = [[U, 0]] # Stores the vertices whose BFS hadn't been completed.
- visited = set()
- max_depth = 0
- while bfs_queue:
- [V, depth] = bfs_queue.pop()
- if max_depth < depth:
- max_depth = depth
- visited.add(V)
- adj_set = self.edges[V]
- for W in adj_set:
- if W not in visited:
- bfs_queue.append([W, depth + 1])
- return max_depth
-
- def diameter(self):
- '''
- Returns the maximum distance between any vertex and U in the connected
- component containing U
- :return:
- '''
- diameter = 0
- for U in self.edges:
- depth = self.bfs_depth(U)
- if depth > diameter:
- diameter = depth
- return diameter
-
-
-################################################################################
-class TopologyDAG(Graph):
- '''
- Creates graph from logical plan, a deeply nested map structure that looks
- like -
- {
- 'spouts' -> {
- spout_name -> {
- 'outputs' -> ['stream_name' -> stream_name]
- }
- },
- 'bolts' -> {
- bolt_name -> {
- 'outputs' -> ['stream_name'->sream_name],
- 'inputs' -> ['stream_name'->stream_name,
- 'component_name'->component_name,
- 'grouping'->grouping_type]
- }
- }
- }
- '''
-
- def __init__(self, logical_plan):
- Graph.__init__(self)
- all_spouts = logical_plan['spouts'] # 'spouts' is required
- all_bolts = dict()
- if 'bolts' in logical_plan:
- all_bolts = logical_plan['bolts']
- stream_source = dict() # Stores the mapping from input stream to component
- # Spout outputs
- for spout_name in all_spouts:
- for output_stream_data in all_spouts[spout_name]['outputs']:
- stream_name = output_stream_data['stream_name']
- stream_source[('%s,%s' % (spout_name, stream_name))] = spout_name
- # Bolt outputs
- for bolt_name in all_bolts:
- for output_stream_data in all_bolts[bolt_name]['outputs']:
- stream_name = output_stream_data['stream_name']
- stream_source[('%s,%s' % (bolt_name, stream_name))] = bolt_name
-
- # Add edges from stream_source to its destination.
- for bolt_name in all_bolts:
- for input_stream_data in all_bolts[bolt_name]['inputs']:
- stream_name = input_stream_data['stream_name']
- component_name = input_stream_data['component_name']
- stream_hash = ('%s,%s' % (component_name, stream_name))
- self.add_edge(stream_source[stream_hash], bolt_name)
diff --git a/heron/tools/tracker/src/python/handlers/logicalplanhandler.py b/heron/tools/tracker/src/python/handlers/logicalplanhandler.py
index 25c2acf..52e9f95 100644
--- a/heron/tools/tracker/src/python/handlers/logicalplanhandler.py
+++ b/heron/tools/tracker/src/python/handlers/logicalplanhandler.py
@@ -18,15 +18,48 @@
# specific language governing permissions and limitations
# under the License.
-""" logicalplanhandler.py """
+"""
+Logical plan objects have the shape:
+ {
+ 'spouts': {
+ spout_name: {
+ 'outputs': [{'stream_name': stream_name}],
+ }
+ },
+ 'bolts': {
+ bolt_name: {
+ 'outputs': [{'stream_name': stream_name}],
+ 'inputs': [{
+ 'stream_name': stream_name,
+ 'component_name': component_name,
+ 'grouping': grouping_type,
+ }]
+ }
+ }
+ }
+
+"""
import traceback
import tornado.gen
import tornado.web
from heron.common.src.python.utils.log import Log
-from heron.tools.tracker.src.python import graph
from heron.tools.tracker.src.python.handlers import BaseHandler
+import networkx
+
+
+def topology_stages(logical_plan):
+ """Return the number of stages in a logical plan."""
+ graph = networkx.DiGraph(
+ (input_info["component_name"], bolt_name)
+ for bolt_name, bolt_info in logical_plan.get("bolts", {}).items()
+ for input_info in bolt_info["inputs"]
+ )
+ # this is is the same as "diameter" if treating the topology as an undirected graph
+ return networkx.dag_longest_path_length(graph)
+
+
class LogicalPlanHandler(BaseHandler):
"""
@@ -75,10 +108,8 @@
outputs=value["outputs"]
)
- diameter = graph.TopologyDAG(lplan).diameter()
-
result = dict(
- stages=diameter,
+ stages=topology_stages(lplan),
spouts=spouts_map,
bolts=bolts_map
)