Replace graph code in tracker (#3579)

diff --git a/heron/tools/tracker/src/python/BUILD b/heron/tools/tracker/src/python/BUILD
index 2f6d487..7b06a78 100644
--- a/heron/tools/tracker/src/python/BUILD
+++ b/heron/tools/tracker/src/python/BUILD
@@ -10,6 +10,7 @@
         "protobuf==3.8.0",
         "tornado==4.0.2",
         "javaobj-py3==0.4.1",
+        "networkx==2.4",
     ],
     deps = [
         "//heron/common/src/python:common-py",
diff --git a/heron/tools/tracker/src/python/graph.py b/heron/tools/tracker/src/python/graph.py
deleted file mode 100644
index b73f084..0000000
--- a/heron/tools/tracker/src/python/graph.py
+++ /dev/null
@@ -1,133 +0,0 @@
-#!/usr/bin/env python3
-# -*- encoding: utf-8 -*-
-
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing,
-#  software distributed under the License is distributed on an
-#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-#  KIND, either express or implied.  See the License for the
-#  specific language governing permissions and limitations
-#  under the License.
-
-''' graph.py '''
-
-
-################################################################################
-class Graph:
-  '''
-  Adjacency list of edges in graph. This will correspond to the streams in a topology DAG.
-  '''
-
-  def __init__(self):
-    # graph adjecency list. Implemented as a map with key = vertices and
-    # values = set of Vertices in adjacency list.
-    self.edges = {}
-
-  # pylint: disable=invalid-name
-  def add_edge(self, U, V):
-    '''
-    :param U:
-    :param V:
-    :return:
-    '''
-    if not U in self.edges:
-      self.edges[U] = set()
-    if not V in self.edges:
-      self.edges[V] = set()
-    if not V in self.edges[U]:
-      self.edges[U].add(V)
-
-  def __str__(self):
-    return str(self.edges)
-
-  def bfs_depth(self, U):
-    '''
-    Returns the maximum distance between any vertex and U in the connected
-    component containing U
-    :param U:
-    :return:
-    '''
-    bfs_queue = [[U, 0]]  # Stores the vertices whose BFS hadn't been completed.
-    visited = set()
-    max_depth = 0
-    while bfs_queue:
-      [V, depth] = bfs_queue.pop()
-      if max_depth < depth:
-        max_depth = depth
-      visited.add(V)
-      adj_set = self.edges[V]
-      for W in adj_set:
-        if W not in visited:
-          bfs_queue.append([W, depth + 1])
-    return max_depth
-
-  def diameter(self):
-    '''
-    Returns the maximum distance between any vertex and U in the connected
-    component containing U
-    :return:
-    '''
-    diameter = 0
-    for U in self.edges:
-      depth = self.bfs_depth(U)
-      if depth > diameter:
-        diameter = depth
-    return diameter
-
-
-################################################################################
-class TopologyDAG(Graph):
-  '''
-  Creates graph from logical plan, a deeply nested map structure that looks
-  like -
-  {
-   'spouts' -> {
-      spout_name -> {
-        'outputs' -> ['stream_name' -> stream_name]
-      }
-   },
-   'bolts' -> {
-     bolt_name -> {
-       'outputs' -> ['stream_name'->sream_name],
-       'inputs' -> ['stream_name'->stream_name,
-                      'component_name'->component_name,
-                      'grouping'->grouping_type]
-     }
-   }
-  }
-  '''
-
-  def __init__(self, logical_plan):
-    Graph.__init__(self)
-    all_spouts = logical_plan['spouts']  # 'spouts' is required
-    all_bolts = dict()
-    if 'bolts' in logical_plan:
-      all_bolts = logical_plan['bolts']
-    stream_source = dict()  # Stores the mapping from input stream to component
-    # Spout outputs
-    for spout_name in all_spouts:
-      for output_stream_data in all_spouts[spout_name]['outputs']:
-        stream_name = output_stream_data['stream_name']
-        stream_source[('%s,%s' % (spout_name, stream_name))] = spout_name
-    # Bolt outputs
-    for bolt_name in all_bolts:
-      for output_stream_data in all_bolts[bolt_name]['outputs']:
-        stream_name = output_stream_data['stream_name']
-        stream_source[('%s,%s' % (bolt_name, stream_name))] = bolt_name
-
-    # Add edges from stream_source to its destination.
-    for bolt_name in all_bolts:
-      for input_stream_data in all_bolts[bolt_name]['inputs']:
-        stream_name = input_stream_data['stream_name']
-        component_name = input_stream_data['component_name']
-        stream_hash = ('%s,%s' % (component_name, stream_name))
-        self.add_edge(stream_source[stream_hash], bolt_name)
diff --git a/heron/tools/tracker/src/python/handlers/logicalplanhandler.py b/heron/tools/tracker/src/python/handlers/logicalplanhandler.py
index 25c2acf..52e9f95 100644
--- a/heron/tools/tracker/src/python/handlers/logicalplanhandler.py
+++ b/heron/tools/tracker/src/python/handlers/logicalplanhandler.py
@@ -18,15 +18,48 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
-""" logicalplanhandler.py """
+"""
+Logical plan objects have the shape:
+  {
+   'spouts': {
+      spout_name: {
+        'outputs': [{'stream_name': stream_name}],
+      }
+   },
+   'bolts': {
+     bolt_name: {
+       'outputs': [{'stream_name': stream_name}],
+       'inputs': [{
+         'stream_name': stream_name,
+         'component_name': component_name,
+         'grouping': grouping_type,
+       }]
+     }
+   }
+  }
+
+"""
 import traceback
 import tornado.gen
 import tornado.web
 
 from heron.common.src.python.utils.log import Log
-from heron.tools.tracker.src.python import graph
 from heron.tools.tracker.src.python.handlers import BaseHandler
 
+import networkx
+
+
+def topology_stages(logical_plan):
+  """Return the number of stages in a logical plan."""
+  graph = networkx.DiGraph(
+      (input_info["component_name"], bolt_name)
+      for bolt_name, bolt_info in logical_plan.get("bolts", {}).items()
+      for input_info in bolt_info["inputs"]
+  )
+  # this is is the same as "diameter" if treating the topology as an undirected graph
+  return networkx.dag_longest_path_length(graph)
+
+
 
 class LogicalPlanHandler(BaseHandler):
   """
@@ -75,10 +108,8 @@
             outputs=value["outputs"]
         )
 
-      diameter = graph.TopologyDAG(lplan).diameter()
-
       result = dict(
-          stages=diameter,
+          stages=topology_stages(lplan),
           spouts=spouts_map,
           bolts=bolts_map
       )