Merge pull request #10131 from Ardagan/LowerExamplesTimeout
Reduce Java Examples Dataflow Precommit timeout
diff --git a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
index 4628c25..8eefec7 100644
--- a/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
+++ b/sdks/python/apache_beam/runners/interactive/display/pcoll_visualization_test.py
@@ -42,6 +42,8 @@
@unittest.skipIf(not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
+@unittest.skipIf(sys.version_info < (3, 6),
+ 'The tests require at least Python 3.6 to work.')
class PCollectionVisualizationTest(unittest.TestCase):
def setUp(self):
@@ -56,8 +58,6 @@
# pylint: disable=range-builtin-not-iterating
self._pcoll = self._p | 'Create' >> beam.Create(range(1000))
- @unittest.skipIf(sys.version_info < (3, 5, 3),
- 'PCollectionVisualization is supported on Python 3.5.3+.')
def test_raise_error_for_non_pcoll_input(self):
class Foo(object):
pass
@@ -67,8 +67,6 @@
self.assertTrue('pcoll should be apache_beam.pvalue.PCollection' in
ctx.exception)
- @unittest.skipIf(sys.version_info < (3, 5, 3),
- 'PCollectionVisualization is supported on Python 3.5.3+.')
def test_pcoll_visualization_generate_unique_display_id(self):
pv_1 = pv.PCollectionVisualization(self._pcoll)
pv_2 = pv.PCollectionVisualization(self._pcoll)
@@ -76,8 +74,6 @@
self.assertNotEqual(pv_1._overview_display_id, pv_2._overview_display_id)
self.assertNotEqual(pv_1._df_display_id, pv_2._df_display_id)
- @unittest.skipIf(sys.version_info < (3, 5, 3),
- 'PCollectionVisualization is supported on Python 3.5.3+.')
@patch('apache_beam.runners.interactive.display.pcoll_visualization'
'.PCollectionVisualization._to_element_list', lambda x: [1, 2, 3])
def test_one_shot_visualization_not_return_handle(self):
@@ -91,8 +87,6 @@
yield [1, 2, 3, 4, 5, 6, 7]
yield [1, 2, 3, 4, 5, 6, 7, 8]
- @unittest.skipIf(sys.version_info < (3, 5, 3),
- 'PCollectionVisualization is supported on Python 3.5.3+.')
@patch('apache_beam.runners.interactive.display.pcoll_visualization'
'.PCollectionVisualization._to_element_list', _mock_to_element_list)
def test_dynamic_plotting_return_handle(self):
@@ -100,8 +94,6 @@
self.assertIsInstance(h, timeloop.Timeloop)
h.stop()
- @unittest.skipIf(sys.version_info < (3, 5, 3),
- 'PCollectionVisualization is supported on Python 3.5.3+.')
@patch('apache_beam.runners.interactive.display.pcoll_visualization'
'.PCollectionVisualization._to_element_list', _mock_to_element_list)
@patch('apache_beam.runners.interactive.display.pcoll_visualization'
@@ -126,10 +118,6 @@
self.assertIs(kwargs['updating_pv'], updating_pv)
h.stop()
- # The code being tested supports 3.5.3+. This specific test has assertion
- # feature that was introduced in 3.6.
- @unittest.skipIf(sys.version_info < (3, 6),
- 'The test requires Python 3.6+.')
@patch('apache_beam.runners.interactive.display.pcoll_visualization'
'.PCollectionVisualization._to_element_list', _mock_to_element_list)
@patch('timeloop.Timeloop.stop')
@@ -150,8 +138,6 @@
# "assert_called" is new in Python 3.6.
mocked_timeloop.assert_called()
- @unittest.skipIf(sys.version_info < (3, 5, 3),
- 'PCollectionVisualization is supported on Python 3.5.3+.')
@patch('apache_beam.runners.interactive.display.pcoll_visualization'
'.PCollectionVisualization._to_element_list', lambda x: [1, 2, 3])
@patch('pandas.DataFrame.sample')
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index 2dbc102..414d564 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -24,6 +24,7 @@
"""
from __future__ import absolute_import
+import atexit
import importlib
import logging
import sys
@@ -32,6 +33,9 @@
from apache_beam.runners import runner
from apache_beam.runners.utils import is_interactive
+# Interactive Beam user flow is data-centric rather than pipeline-centric, so
+# there is only one global interactive environment instance that manages
+# implementation that enables interactivity.
_interactive_beam_env = None
@@ -46,6 +50,8 @@
def new_env(cache_manager=None):
"""Creates a new Interactive Beam environment to replace current one."""
global _interactive_beam_env
+ if _interactive_beam_env:
+ _interactive_beam_env.cleanup()
_interactive_beam_env = None
return current_env(cache_manager)
@@ -63,6 +69,9 @@
def __init__(self, cache_manager=None):
self._cache_manager = cache_manager
+ # Register a cleanup routine when kernel is restarted or terminated.
+ if cache_manager:
+ atexit.register(self.cleanup)
# Holds class instances, module object, string of module names.
self._watching_set = set()
# Holds variables list of (Dict[str, object]).
@@ -74,10 +83,10 @@
self._pipeline_results = {}
# Always watch __main__ module.
self.watch('__main__')
- # Do a warning level logging if current python version is below 3.5.3.
- if sys.version_info < (3, 5, 3):
+ # Do a warning level logging if current python version is below 3.6.
+ if sys.version_info < (3, 6):
self._is_py_version_ready = False
- logging.warning('Interactive Beam requires Python 3.5.3+.')
+ logging.warning('Interactive Beam requires Python 3.6+.')
else:
self._is_py_version_ready = True
# Check if [interactive] dependencies are installed.
@@ -127,6 +136,11 @@
"""
return self._is_in_notebook
+ def cleanup(self):
+ # Utilizes cache manager to clean up cache from everywhere.
+ if self.cache_manager():
+ self.cache_manager().cleanup()
+
def watch(self, watchable):
"""Watches a watchable.
@@ -163,7 +177,18 @@
def set_cache_manager(self, cache_manager):
"""Sets the cache manager held by current Interactive Environment."""
+ if self._cache_manager is cache_manager:
+ # NOOP if setting to the same cache_manager.
+ return
+ if self._cache_manager:
+ # Invoke cleanup routine when a new cache_manager is forcefully set and
+ # current cache_manager is not None.
+ self.cleanup()
+ atexit.unregister(self.cleanup)
self._cache_manager = cache_manager
+ if self._cache_manager:
+ # Re-register cleanup routine for the new cache_manager if it's not None.
+ atexit.register(self.cleanup)
def cache_manager(self):
"""Gets the cache manager held by current Interactive Environment."""
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
index 6fa257b..76c29b8 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
@@ -19,16 +19,27 @@
from __future__ import absolute_import
import importlib
+import sys
import unittest
import apache_beam as beam
from apache_beam.runners import runner
+from apache_beam.runners.interactive import cache_manager as cache
from apache_beam.runners.interactive import interactive_environment as ie
+# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
+# unittest.mock module.
+try:
+ from unittest.mock import call, patch
+except ImportError:
+ from mock import call, patch
+
# The module name is also a variable in module.
_module_name = 'apache_beam.runners.interactive.interactive_environment_test'
+@unittest.skipIf(sys.version_info < (3, 6),
+ 'The tests require at least Python 3.6 to work.')
class InteractiveEnvironmentTest(unittest.TestCase):
def setUp(self):
@@ -152,11 +163,91 @@
pipeline_result)
self.assertIs(ie.current_env().pipeline_result(self._p), None)
- def test_is_none_when_pipeline_absent(self):
+ def test_pipeline_result_is_none_when_pipeline_absent(self):
self.assertIs(ie.current_env().pipeline_result(self._p), None)
self.assertIs(ie.current_env().is_terminated(self._p), True)
self.assertIs(ie.current_env().evict_pipeline_result(self._p), None)
+ @patch('atexit.register')
+ def test_no_cleanup_when_cm_none(self,
+ mocked_atexit):
+ ie.new_env(None)
+ mocked_atexit.assert_not_called()
+
+ @patch('atexit.register')
+ def test_cleanup_when_cm_not_none(self,
+ mocked_atexit):
+ ie.new_env(cache.FileBasedCacheManager())
+ mocked_atexit.assert_called_once()
+
+ @patch('atexit.register')
+ @patch('atexit.unregister')
+ def test_cleanup_unregistered_when_not_none_cm_cleared(self,
+ mocked_unreg,
+ mocked_reg):
+ ie.new_env(cache.FileBasedCacheManager())
+ mocked_reg.assert_called_once()
+ mocked_unreg.assert_not_called()
+ ie.current_env().set_cache_manager(None)
+ mocked_reg.assert_called_once()
+ mocked_unreg.assert_called_once()
+
+ @patch('atexit.register')
+ @patch('atexit.unregister')
+ def test_cleanup_reregistered_when_cm_changed(self,
+ mocked_unreg,
+ mocked_reg):
+ ie.new_env(cache.FileBasedCacheManager())
+ mocked_unreg.assert_not_called()
+ ie.current_env().set_cache_manager(cache.FileBasedCacheManager())
+ mocked_unreg.assert_called_once()
+ mocked_reg.assert_has_calls([call(ie.current_env().cleanup),
+ call(ie.current_env().cleanup)])
+
+ @patch('apache_beam.runners.interactive.interactive_environment'
+ '.InteractiveEnvironment.cleanup')
+ def test_cleanup_invoked_when_new_env_replace_not_none_env(self,
+ mocked_cleanup):
+ ie._interactive_beam_env = None
+ ie.new_env(cache.FileBasedCacheManager())
+ mocked_cleanup.assert_not_called()
+ ie.new_env(cache.FileBasedCacheManager())
+ mocked_cleanup.assert_called_once()
+
+ @patch('apache_beam.runners.interactive.interactive_environment'
+ '.InteractiveEnvironment.cleanup')
+ def test_cleanup_invoked_when_cm_changed(self,
+ mocked_cleanup):
+ ie._interactive_beam_env = None
+ ie.new_env(cache.FileBasedCacheManager())
+ ie.current_env().set_cache_manager(cache.FileBasedCacheManager())
+ mocked_cleanup.assert_called_once()
+
+ @patch('atexit.register')
+ @patch('atexit.unregister')
+ def test_cleanup_registered_when_none_cm_changed(self,
+ mocked_unreg,
+ mocked_reg):
+ ie.new_env(None)
+ mocked_reg.assert_not_called()
+ mocked_unreg.assert_not_called()
+ ie.current_env().set_cache_manager(cache.FileBasedCacheManager())
+ mocked_reg.assert_called_once()
+ mocked_unreg.assert_not_called()
+
+ @patch('atexit.register')
+ @patch('atexit.unregister')
+ def test_noop_when_cm_is_not_changed(self,
+ mocked_unreg,
+ mocked_reg):
+ cache_manager = cache.FileBasedCacheManager()
+ ie.new_env(cache_manager)
+ mocked_unreg.assert_not_called()
+ mocked_reg.assert_called_once()
+ ie.current_env().set_cache_manager(cache_manager)
+ mocked_unreg.assert_not_called()
+ mocked_reg.assert_called_once()
+
if __name__ == '__main__':
unittest.main()