Interactive: clean up when pipeline is out of scope (#12339)

* Interactive: clean up when pipeline is out of scope

1. Completed the cleanup routine for all internal states held by the
   current interactive environment.
2. Utilized the environment inspector to determine whether a pipeline is
   out of scope: not assigned to variable and has no inspectable
   PCollections.
3. Invoked the cleanup every time the user defined pipelines in watched
   scope are refreshed.

Change-Id: Ia0791b865def88e81e7b1595b8430d3a9df9516e

* Fixed a test that didn't start a test stream server. With the new cleaning up routine, all test stream servers held by current interactive environment will be stopped in the test. If the grpc server has never been started (happens in tests), the stop operation will hang for a long time.

Change-Id: I2ae7ecf5e3ac11f32888887d82cd885fc64cc82f

Co-authored-by: Ning Kang <ningk@google.com>
diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job.py b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
index 117cd59..1b05285 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
@@ -291,7 +291,7 @@
             'data to start at the same time, all captured data has been '
             'cleared and a new segment of data will be recorded.')
 
-    ie.current_env().cleanup()
+    ie.current_env().cleanup(user_pipeline)
     ie.current_env().set_cached_source_signature(
         user_pipeline, current_signature)
   return is_changed
diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
index 45c65dd..803f6ce 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
@@ -91,8 +91,6 @@
     sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.')
 class BackgroundCachingJobTest(unittest.TestCase):
   def tearDown(self):
-    for _, job in ie.current_env()._background_caching_jobs.items():
-      job.cancel()
     ie.new_env()
 
   # TODO(BEAM-8335): remove the patches when there are appropriate test sources
@@ -302,9 +300,11 @@
   def test_determine_a_test_stream_service_running(self):
     pipeline = _build_an_empty_stream_pipeline()
     test_stream_service = TestStreamServiceController(reader=None)
+    test_stream_service.start()
     ie.current_env().set_test_stream_service_controller(
         pipeline, test_stream_service)
     self.assertTrue(bcj.is_a_test_stream_service_running(pipeline))
+    # the test_stream_service will be cleaned up on teardown.
 
   def test_stop_a_running_test_stream_service(self):
     pipeline = _build_an_empty_stream_pipeline()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index 1d28517..4363d17 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -251,18 +251,32 @@
     return self._inspector
 
   def cleanup(self, pipeline=None):
-    """Cleans up cached states for the given pipeline. Cleans up
-    for all pipelines if no specific pipeline is given."""
+    """Cleans up cached states for the given pipeline. Noop if the given
+    pipeline is absent from the environment. Cleans up for all pipelines
+    if no pipeline is specified."""
     if pipeline:
+      from apache_beam.runners.interactive import background_caching_job as bcj
+      bcj.attempt_to_cancel_background_caching_job(pipeline)
+      bcj.attempt_to_stop_test_stream_service(pipeline)
       cache_manager = self.get_cache_manager(pipeline)
       if cache_manager:
         cache_manager.cleanup()
     else:
+      for _, job in self._background_caching_jobs.items():
+        if job:
+          job.cancel()
+      for _, controller in self._test_stream_service_controllers.items():
+        if controller:
+          controller.stop()
       for _, cache_manager in self._cache_managers.items():
-        cache_manager.cleanup()
+        if cache_manager:
+          cache_manager.cleanup()
 
+    self.evict_background_caching_job(pipeline)
+    self.evict_test_stream_service_controller(pipeline)
     self.evict_computed_pcollections(pipeline)
     self.evict_cached_source_signature(pipeline)
+    self.evict_pipeline_result(pipeline)
 
   def watch(self, watchable):
     """Watches a watchable.
@@ -343,9 +357,13 @@
         'apache_beam.runners.runner.PipelineResult or its subclass')
     self._main_pipeline_results[str(id(pipeline))] = result
 
-  def evict_pipeline_result(self, pipeline):
-    """Evicts the tracking of given pipeline run. Noop if absent."""
-    return self._main_pipeline_results.pop(str(id(pipeline)), None)
+  def evict_pipeline_result(self, pipeline=None):
+    """Evicts the last run result of the given pipeline. Noop if the pipeline
+    is absent from the environment. If no pipeline is specified, evicts for all
+    pipelines."""
+    if pipeline:
+      return self._main_pipeline_results.pop(str(id(pipeline)), None)
+    self._main_pipeline_results.clear()
 
   def pipeline_result(self, pipeline):
     """Gets the pipeline run result. None if absent."""
@@ -364,16 +382,24 @@
     """Gets the background caching job started from the given pipeline."""
     return self._background_caching_jobs.get(str(id(pipeline)), None)
 
+  def evict_background_caching_job(self, pipeline=None):
+    """Evicts the background caching job started from the given pipeline. Noop
+    if the given pipeline is absent from the environment. If no pipeline is
+    specified, evicts for all pipelines."""
+    if pipeline:
+      return self._background_caching_jobs.pop(str(id(pipeline)), None)
+    self._background_caching_jobs.clear()
+
   def set_test_stream_service_controller(self, pipeline, controller):
     """Sets the test stream service controller that has started a gRPC server
-    serving the test stream for any job started from the given user-defined
+    serving the test stream for any job started from the given user defined
     pipeline.
     """
     self._test_stream_service_controllers[str(id(pipeline))] = controller
 
   def get_test_stream_service_controller(self, pipeline):
     """Gets the test stream service controller that has started a gRPC server
-    serving the test stream for any job started from the given user-defined
+    serving the test stream for any job started from the given user defined
     pipeline.
     """
     return self._test_stream_service_controllers.get(str(id(pipeline)), None)
@@ -381,9 +407,12 @@
   def evict_test_stream_service_controller(self, pipeline):
     """Evicts and pops the test stream service controller that has started a
     gRPC server serving the test stream for any job started from the given
-    user-defined pipeline.
+    user defined pipeline. Noop if the given pipeline is absent from the
+    environment. If no pipeline is specified, evicts for all pipelines.
     """
-    return self._test_stream_service_controllers.pop(str(id(pipeline)), None)
+    if pipeline:
+      return self._test_stream_service_controllers.pop(str(id(pipeline)), None)
+    self._test_stream_service_controllers.clear()
 
   def is_terminated(self, pipeline):
     """Queries if the most recent job (by executing the given pipeline) state
@@ -400,13 +429,15 @@
     return self._cached_source_signature.get(str(id(pipeline)), set())
 
   def evict_cached_source_signature(self, pipeline=None):
+    """Evicts the signature generated for each recorded source of the given
+    pipeline. Noop if the given pipeline is absent from the environment. If no
+    pipeline is specified, evicts for all pipelines."""
     if pipeline:
-      self._cached_source_signature.pop(str(id(pipeline)), None)
-    else:
-      self._cached_source_signature.clear()
+      return self._cached_source_signature.pop(str(id(pipeline)), None)
+    self._cached_source_signature.clear()
 
   def track_user_pipelines(self):
-    """Record references to all user-defined pipeline instances watched in
+    """Record references to all user defined pipeline instances watched in
     current environment.
 
     Current static global singleton interactive environment holds references to
@@ -416,11 +447,17 @@
     then handle them differently.
 
     This is invoked every time a PTransform is to be applied if the current
-    code execution is under ipython due to the possibility that any user-defined
+    code execution is under ipython due to the possibility that any user defined
     pipeline can be re-evaluated through notebook cell re-execution at any time.
 
     Each time this is invoked, it will check if there is a cache manager
     already created for each user defined pipeline. If not, create one for it.
+
+    If a pipeline is no longer watched due to re-execution while its
+    PCollections are still in watched scope, the pipeline becomes anonymous but
+    still accessible indirectly through references to its PCollections. This
+    function also clears up internal states for those anonymous pipelines once
+    all their PCollections are anonymous.
     """
     self._tracked_user_pipelines = set()
     for watching in self.watching():
@@ -428,6 +465,17 @@
         if isinstance(val, beam.pipeline.Pipeline):
           self._tracked_user_pipelines.add(val)
           _ = self.get_cache_manager(val, create_if_absent=True)
+    all_tracked_pipeline_ids = set(self._background_caching_jobs.keys()).union(
+        set(self._test_stream_service_controllers.keys()),
+        set(self._cache_managers.keys()),
+        {str(id(pcoll.pipeline))
+         for pcoll in self._computed_pcolls},
+        set(self._cached_source_signature.keys()),
+        set(self._main_pipeline_results.keys()))
+    inspectable_pipelines = self._inspector.inspectable_pipelines
+    for pipeline in all_tracked_pipeline_ids:
+      if pipeline not in inspectable_pipelines:
+        self.cleanup(pipeline)
 
   @property
   def tracked_user_pipelines(self):
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
index 6f44dac..6650c63 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
@@ -236,14 +236,33 @@
   @patch(
       'apache_beam.runners.interactive.interactive_environment'
       '.InteractiveEnvironment.cleanup')
-  def test_cleanup_invoked_when_cache_manager_is_evicted(self, mocked_cleanup):
+  def test_track_user_pipeline_cleanup_non_inspectable_pipeline(
+      self, mocked_cleanup):
     ie._interactive_beam_env = None
     ie.new_env()
-    dummy_pipeline = 'dummy'
+    dummy_pipeline_1 = beam.Pipeline()
+    dummy_pipeline_2 = beam.Pipeline()
+    dummy_pipeline_3 = beam.Pipeline()
+    dummy_pipeline_4 = beam.Pipeline()
+    dummy_pcoll = dummy_pipeline_4 | beam.Create([1])
+    dummy_pipeline_5 = beam.Pipeline()
+    dummy_non_inspectable_pipeline = 'dummy'
+    ie.current_env().watch(locals())
+    from apache_beam.runners.interactive.background_caching_job import BackgroundCachingJob
+    ie.current_env().set_background_caching_job(
+        dummy_pipeline_1,
+        BackgroundCachingJob(
+            runner.PipelineResult(runner.PipelineState.DONE), limiters=[]))
+    ie.current_env().set_test_stream_service_controller(dummy_pipeline_2, None)
     ie.current_env().set_cache_manager(
-        cache.FileBasedCacheManager(), dummy_pipeline)
+        cache.FileBasedCacheManager(), dummy_pipeline_3)
+    ie.current_env().mark_pcollection_computed([dummy_pcoll])
+    ie.current_env().set_cached_source_signature(
+        dummy_non_inspectable_pipeline, None)
+    ie.current_env().set_pipeline_result(
+        dummy_pipeline_5, runner.PipelineResult(runner.PipelineState.RUNNING))
     mocked_cleanup.assert_not_called()
-    ie.current_env().evict_cache_manager(dummy_pipeline)
+    ie.current_env().track_user_pipelines()
     mocked_cleanup.assert_called_once()
 
 
diff --git a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
index 3bce182..a4a9f02 100644
--- a/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
+++ b/sdks/python/apache_beam/runners/interactive/messaging/interactive_environment_inspector.py
@@ -41,6 +41,7 @@
   def __init__(self):
     self._inspectables = {}
     self._anonymous = {}
+    self._inspectable_pipelines = set()
 
   @property
   def inspectables(self):
@@ -49,6 +50,20 @@
     self._inspectables = inspect()
     return self._inspectables
 
+  @property
+  def inspectable_pipelines(self):
+    """Returns a dictionary of all inspectable pipelines. The keys are
+    stringified id of pipeline instances.
+
+    This includes user defined pipeline assigned to variables and anonymous
+    pipelines with inspectable PCollections.
+    If a user defined pipeline is not within the returned dict, it can be
+    considered out of scope, and all resources and memory states related to it
+    should be released.
+    """
+    _ = self.list_inspectables()
+    return self._inspectable_pipelines
+
   @as_json
   def list_inspectables(self):
     """Lists inspectables in JSON format.
@@ -89,6 +104,8 @@
           pipeline_identifier = obfuscate(meta(pipelines[pipeline], pipeline))
           listing[pipeline_identifier]['pcolls'][identifier] = inspectable[
               'metadata']
+    self._inspectable_pipelines = dict(
+        (str(id(pipeline)), pipeline) for pipeline in pipelines)
     return listing
 
   def get_val(self, identifier):
diff --git a/sdks/python/apache_beam/runners/interactive/options/capture_control.py b/sdks/python/apache_beam/runners/interactive/options/capture_control.py
index 12e901f..ab877b5 100644
--- a/sdks/python/apache_beam/runners/interactive/options/capture_control.py
+++ b/sdks/python/apache_beam/runners/interactive/options/capture_control.py
@@ -29,7 +29,6 @@
 from datetime import timedelta
 
 from apache_beam.io.gcp.pubsub import ReadFromPubSub
-from apache_beam.runners.interactive import background_caching_job as bcj
 from apache_beam.runners.interactive import interactive_environment as ie
 from apache_beam.runners.interactive.options import capture_limiters
 
@@ -71,15 +70,7 @@
   runs, Interactive Beam will capture fresh data."""
   if ie.current_env().options.enable_capture_replay:
     _LOGGER.info(
-        'You have requested Interactive Beam to evict all captured '
+        'You have requested Interactive Beam to evict all recorded'
         'data that could be deterministically replayed among multiple '
         'pipeline runs.')
-  ie.current_env().track_user_pipelines()
-  if pipeline:
-    bcj.attempt_to_cancel_background_caching_job(pipeline)
-    bcj.attempt_to_stop_test_stream_service(pipeline)
-  else:
-    for user_pipeline in ie.current_env().tracked_user_pipelines:
-      bcj.attempt_to_cancel_background_caching_job(user_pipeline)
-      bcj.attempt_to_stop_test_stream_service(user_pipeline)
   ie.current_env().cleanup(pipeline)