Merge pull request #10131 from Ardagan/LowerExamplesTimeout

Reduce Java Examples Dataflow Precommit timeout
diff --git a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
index 4628c25..8eefec7 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
@@ -42,6 +42,8 @@
 
 @unittest.skipIf(not ie.current_env().is_interactive_ready,
                  '[interactive] dependency is not installed.')
+@unittest.skipIf(sys.version_info < (3, 6),
+                 'The tests require at least Python 3.6 to work.')
 class PCollectionVisualizationTest(unittest.TestCase):
 
   def setUp(self):
@@ -56,8 +58,6 @@
     # pylint: disable=range-builtin-not-iterating
     self._pcoll = self._p | 'Create' >> beam.Create(range(1000))
 
-  @unittest.skipIf(sys.version_info < (3, 5, 3),
-                   'PCollectionVisualization is supported on Python 3.5.3+.')
   def test_raise_error_for_non_pcoll_input(self):
     class Foo(object):
       pass
@@ -67,8 +67,6 @@
       self.assertTrue('pcoll should be apache_beam.pvalue.PCollection' in
                       ctx.exception)
 
-  @unittest.skipIf(sys.version_info < (3, 5, 3),
-                   'PCollectionVisualization is supported on Python 3.5.3+.')
   def test_pcoll_visualization_generate_unique_display_id(self):
     pv_1 = pv.PCollectionVisualization(self._pcoll)
     pv_2 = pv.PCollectionVisualization(self._pcoll)
@@ -76,8 +74,6 @@
     self.assertNotEqual(pv_1._overview_display_id, pv_2._overview_display_id)
     self.assertNotEqual(pv_1._df_display_id, pv_2._df_display_id)
 
-  @unittest.skipIf(sys.version_info < (3, 5, 3),
-                   'PCollectionVisualization is supported on Python 3.5.3+.')
   @patch('apache_beam.runners.interactive.display.pcoll_visualization'
          '.PCollectionVisualization._to_element_list', lambda x: [1, 2, 3])
   def test_one_shot_visualization_not_return_handle(self):
@@ -91,8 +87,6 @@
     yield [1, 2, 3, 4, 5, 6, 7]
     yield [1, 2, 3, 4, 5, 6, 7, 8]
 
-  @unittest.skipIf(sys.version_info < (3, 5, 3),
-                   'PCollectionVisualization is supported on Python 3.5.3+.')
   @patch('apache_beam.runners.interactive.display.pcoll_visualization'
          '.PCollectionVisualization._to_element_list', _mock_to_element_list)
   def test_dynamic_plotting_return_handle(self):
@@ -100,8 +94,6 @@
     self.assertIsInstance(h, timeloop.Timeloop)
     h.stop()
 
-  @unittest.skipIf(sys.version_info < (3, 5, 3),
-                   'PCollectionVisualization is supported on Python 3.5.3+.')
   @patch('apache_beam.runners.interactive.display.pcoll_visualization'
          '.PCollectionVisualization._to_element_list', _mock_to_element_list)
   @patch('apache_beam.runners.interactive.display.pcoll_visualization'
@@ -126,10 +118,6 @@
       self.assertIs(kwargs['updating_pv'], updating_pv)
     h.stop()
 
-  # The code being tested supports 3.5.3+. This specific test has assertion
-  # feature that was introduced in 3.6.
-  @unittest.skipIf(sys.version_info < (3, 6),
-                   'The test requires Python 3.6+.')
   @patch('apache_beam.runners.interactive.display.pcoll_visualization'
          '.PCollectionVisualization._to_element_list', _mock_to_element_list)
   @patch('timeloop.Timeloop.stop')
@@ -150,8 +138,6 @@
     # "assert_called" is new in Python 3.6.
     mocked_timeloop.assert_called()
 
-  @unittest.skipIf(sys.version_info < (3, 5, 3),
-                   'PCollectionVisualization is supported on Python 3.5.3+.')
   @patch('apache_beam.runners.interactive.display.pcoll_visualization'
          '.PCollectionVisualization._to_element_list', lambda x: [1, 2, 3])
   @patch('pandas.DataFrame.sample')
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index 2dbc102..414d564 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -24,6 +24,7 @@
 """
 from __future__ import absolute_import
 
+import atexit
 import importlib
 import logging
 import sys
@@ -32,6 +33,9 @@
 from apache_beam.runners import runner
 from apache_beam.runners.utils import is_interactive
 
+# Interactive Beam user flow is data-centric rather than pipeline-centric, so
+# there is only one global interactive environment instance that manages
+# implementation that enables interactivity.
 _interactive_beam_env = None
 
 
@@ -46,6 +50,8 @@
 def new_env(cache_manager=None):
   """Creates a new Interactive Beam environment to replace current one."""
   global _interactive_beam_env
+  if _interactive_beam_env:
+    _interactive_beam_env.cleanup()
   _interactive_beam_env = None
   return current_env(cache_manager)
 
@@ -63,6 +69,9 @@
 
   def __init__(self, cache_manager=None):
     self._cache_manager = cache_manager
+    # Register a cleanup routine when kernel is restarted or terminated.
+    if cache_manager:
+      atexit.register(self.cleanup)
     # Holds class instances, module object, string of module names.
     self._watching_set = set()
     # Holds variables list of (Dict[str, object]).
@@ -74,10 +83,10 @@
     self._pipeline_results = {}
     # Always watch __main__ module.
     self.watch('__main__')
-    # Do a warning level logging if current python version is below 3.5.3.
-    if sys.version_info < (3, 5, 3):
+    # Do a warning level logging if current python version is below 3.6.
+    if sys.version_info < (3, 6):
       self._is_py_version_ready = False
-      logging.warning('Interactive Beam requires Python 3.5.3+.')
+      logging.warning('Interactive Beam requires Python 3.6+.')
     else:
       self._is_py_version_ready = True
     # Check if [interactive] dependencies are installed.
@@ -127,6 +136,11 @@
     """
     return self._is_in_notebook
 
+  def cleanup(self):
+    # Utilizes cache manager to clean up cache from everywhere.
+    if self.cache_manager():
+      self.cache_manager().cleanup()
+
   def watch(self, watchable):
     """Watches a watchable.
 
@@ -163,7 +177,18 @@
 
   def set_cache_manager(self, cache_manager):
     """Sets the cache manager held by current Interactive Environment."""
+    if self._cache_manager is cache_manager:
+      # NOOP if setting to the same cache_manager.
+      return
+    if self._cache_manager:
+      # Invoke cleanup routine when a new cache_manager is forcefully set and
+      # current cache_manager is not None.
+      self.cleanup()
+      atexit.unregister(self.cleanup)
     self._cache_manager = cache_manager
+    if self._cache_manager:
+      # Re-register cleanup routine for the new cache_manager if it's not None.
+      atexit.register(self.cleanup)
 
   def cache_manager(self):
     """Gets the cache manager held by current Interactive Environment."""
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
index 6fa257b..76c29b8 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
@@ -19,16 +19,27 @@
 from __future__ import absolute_import
 
 import importlib
+import sys
 import unittest
 
 import apache_beam as beam
 from apache_beam.runners import runner
+from apache_beam.runners.interactive import cache_manager as cache
 from apache_beam.runners.interactive import interactive_environment as ie
 
+# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
+# unittest.mock module.
+try:
+  from unittest.mock import call, patch
+except ImportError:
+  from mock import call, patch
+
 # The module name is also a variable in module.
 _module_name = 'apache_beam.runners.interactive.interactive_environment_test'
 
 
+@unittest.skipIf(sys.version_info < (3, 6),
+                 'The tests require at least Python 3.6 to work.')
 class InteractiveEnvironmentTest(unittest.TestCase):
 
   def setUp(self):
@@ -152,11 +163,91 @@
                   pipeline_result)
     self.assertIs(ie.current_env().pipeline_result(self._p), None)
 
-  def test_is_none_when_pipeline_absent(self):
+  def test_pipeline_result_is_none_when_pipeline_absent(self):
     self.assertIs(ie.current_env().pipeline_result(self._p), None)
     self.assertIs(ie.current_env().is_terminated(self._p), True)
     self.assertIs(ie.current_env().evict_pipeline_result(self._p), None)
 
+  @patch('atexit.register')
+  def test_no_cleanup_when_cm_none(self,
+                                   mocked_atexit):
+    ie.new_env(None)
+    mocked_atexit.assert_not_called()
+
+  @patch('atexit.register')
+  def test_cleanup_when_cm_not_none(self,
+                                    mocked_atexit):
+    ie.new_env(cache.FileBasedCacheManager())
+    mocked_atexit.assert_called_once()
+
+  @patch('atexit.register')
+  @patch('atexit.unregister')
+  def test_cleanup_unregistered_when_not_none_cm_cleared(self,
+                                                         mocked_unreg,
+                                                         mocked_reg):
+    ie.new_env(cache.FileBasedCacheManager())
+    mocked_reg.assert_called_once()
+    mocked_unreg.assert_not_called()
+    ie.current_env().set_cache_manager(None)
+    mocked_reg.assert_called_once()
+    mocked_unreg.assert_called_once()
+
+  @patch('atexit.register')
+  @patch('atexit.unregister')
+  def test_cleanup_reregistered_when_cm_changed(self,
+                                                mocked_unreg,
+                                                mocked_reg):
+    ie.new_env(cache.FileBasedCacheManager())
+    mocked_unreg.assert_not_called()
+    ie.current_env().set_cache_manager(cache.FileBasedCacheManager())
+    mocked_unreg.assert_called_once()
+    mocked_reg.assert_has_calls([call(ie.current_env().cleanup),
+                                 call(ie.current_env().cleanup)])
+
+  @patch('apache_beam.runners.interactive.interactive_environment'
+         '.InteractiveEnvironment.cleanup')
+  def test_cleanup_invoked_when_new_env_replace_not_none_env(self,
+                                                             mocked_cleanup):
+    ie._interactive_beam_env = None
+    ie.new_env(cache.FileBasedCacheManager())
+    mocked_cleanup.assert_not_called()
+    ie.new_env(cache.FileBasedCacheManager())
+    mocked_cleanup.assert_called_once()
+
+  @patch('apache_beam.runners.interactive.interactive_environment'
+         '.InteractiveEnvironment.cleanup')
+  def test_cleanup_invoked_when_cm_changed(self,
+                                           mocked_cleanup):
+    ie._interactive_beam_env = None
+    ie.new_env(cache.FileBasedCacheManager())
+    ie.current_env().set_cache_manager(cache.FileBasedCacheManager())
+    mocked_cleanup.assert_called_once()
+
+  @patch('atexit.register')
+  @patch('atexit.unregister')
+  def test_cleanup_registered_when_none_cm_changed(self,
+                                                   mocked_unreg,
+                                                   mocked_reg):
+    ie.new_env(None)
+    mocked_reg.assert_not_called()
+    mocked_unreg.assert_not_called()
+    ie.current_env().set_cache_manager(cache.FileBasedCacheManager())
+    mocked_reg.assert_called_once()
+    mocked_unreg.assert_not_called()
+
+  @patch('atexit.register')
+  @patch('atexit.unregister')
+  def test_noop_when_cm_is_not_changed(self,
+                                       mocked_unreg,
+                                       mocked_reg):
+    cache_manager = cache.FileBasedCacheManager()
+    ie.new_env(cache_manager)
+    mocked_unreg.assert_not_called()
+    mocked_reg.assert_called_once()
+    ie.current_env().set_cache_manager(cache_manager)
+    mocked_unreg.assert_not_called()
+    mocked_reg.assert_called_once()
+
 
 if __name__ == '__main__':
   unittest.main()