| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Tests for apache_beam.runners.interactive.display.pcoll_visualization.""" |
| # pytype: skip-file |
| |
| import unittest |
| from unittest.mock import ANY |
| from unittest.mock import PropertyMock |
| from unittest.mock import patch |
| |
| import pytz |
| |
| import apache_beam as beam |
| from apache_beam.runners import runner |
| from apache_beam.runners.interactive import interactive_beam as ib |
| from apache_beam.runners.interactive import interactive_environment as ie |
| from apache_beam.runners.interactive import interactive_runner as ir |
| from apache_beam.runners.interactive.display import pcoll_visualization as pv |
| from apache_beam.runners.interactive.recording_manager import RecordingManager |
| from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython |
| from apache_beam.transforms.window import GlobalWindow |
| from apache_beam.transforms.window import IntervalWindow |
| from apache_beam.utils.windowed_value import PaneInfo |
| from apache_beam.utils.windowed_value import PaneInfoTiming |
| |
| try: |
| import timeloop |
| except ImportError: |
| pass |
| |
| |
| @unittest.skipIf( |
| not ie.current_env().is_interactive_ready, |
| '[interactive] dependency is not installed.') |
| class PCollectionVisualizationTest(unittest.TestCase): |
| def setUp(self): |
| ie.new_env() |
| # Allow unit test to run outside of ipython kernel since we don't test the |
| # frontend rendering in unit tests. |
| pv._pcoll_visualization_ready = True |
| # Generally test the logic where notebook is connected to the assumed |
| # ipython kernel by forcefully setting notebook check to True. |
| ie.current_env()._is_in_notebook = True |
| ib.options.display_timezone = pytz.timezone('US/Pacific') |
| |
| self._p = beam.Pipeline(ir.InteractiveRunner()) |
| # pylint: disable=bad-option-value |
| self._pcoll = self._p | 'Create' >> beam.Create(range(5)) |
| |
| ib.watch(self) |
| ie.current_env().track_user_pipelines() |
| |
| recording_manager = RecordingManager(self._p) |
| recording = recording_manager.record([self._pcoll], max_n=5, max_duration=5) |
| self._stream = recording.stream(self._pcoll) |
| |
| def test_pcoll_visualization_generate_unique_display_id(self): |
| pv_1 = pv.PCollectionVisualization(self._stream) |
| pv_2 = pv.PCollectionVisualization(self._stream) |
| self.assertNotEqual(pv_1._dive_display_id, pv_2._dive_display_id) |
| self.assertNotEqual(pv_1._overview_display_id, pv_2._overview_display_id) |
| self.assertNotEqual(pv_1._df_display_id, pv_2._df_display_id) |
| |
| @patch('IPython.get_ipython', new_callable=mock_get_ipython) |
| @patch( |
| 'apache_beam.runners.interactive.interactive_environment' |
| '.InteractiveEnvironment.is_in_notebook', |
| new_callable=PropertyMock) |
| def test_one_shot_visualization_not_return_handle( |
| self, mocked_is_in_notebook, unused): |
| mocked_is_in_notebook.return_value = True |
| self.assertIsNone(pv.visualize(self._stream, display_facets=True)) |
| |
| @patch('IPython.get_ipython', new_callable=mock_get_ipython) |
| @patch( |
| 'apache_beam.runners.interactive.interactive_environment' |
| '.InteractiveEnvironment.is_in_notebook', |
| new_callable=PropertyMock) |
| def test_dynamic_plotting_return_handle(self, mocked_is_in_notebook, unused): |
| mocked_is_in_notebook.return_value = True |
| h = pv.visualize( |
| self._stream, dynamic_plotting_interval=1, display_facets=True) |
| self.assertIsInstance(h, timeloop.Timeloop) |
| h.stop() |
| |
| @patch('IPython.get_ipython', new_callable=mock_get_ipython) |
| @patch( |
| 'apache_beam.runners.interactive.interactive_environment' |
| '.InteractiveEnvironment.is_in_notebook', |
| new_callable=PropertyMock) |
| def test_no_dynamic_plotting_when_not_in_notebook( |
| self, mocked_is_in_notebook, unused): |
| mocked_is_in_notebook.return_value = False |
| h = pv.visualize( |
| self._stream, dynamic_plotting_interval=1, display_facets=True) |
| self.assertIsNone(h) |
| |
| @patch( |
| 'apache_beam.runners.interactive.display.pcoll_visualization' |
| '.PCollectionVisualization._display_dive') |
| @patch( |
| 'apache_beam.runners.interactive.display.pcoll_visualization' |
| '.PCollectionVisualization._display_overview') |
| @patch( |
| 'apache_beam.runners.interactive.display.pcoll_visualization' |
| '.PCollectionVisualization._display_dataframe') |
| def test_dynamic_plotting_updates_same_display( |
| self, |
| mocked_display_dataframe, |
| mocked_display_overview, |
| mocked_display_dive): |
| original_pcollection_visualization = pv.PCollectionVisualization( |
| self._stream, display_facets=True) |
| # Dynamic plotting always creates a new PCollectionVisualization. |
| new_pcollection_visualization = pv.PCollectionVisualization( |
| self._stream, display_facets=True) |
| # The display uses ANY data the moment display is invoked, and updates |
| # web elements with ids fetched from the given updating_pv. |
| new_pcollection_visualization.display( |
| updating_pv=original_pcollection_visualization) |
| mocked_display_dataframe.assert_called_once_with( |
| ANY, original_pcollection_visualization) |
| # Below assertions are still true without newer calls. |
| mocked_display_overview.assert_called_once_with( |
| ANY, original_pcollection_visualization) |
| mocked_display_dive.assert_called_once_with( |
| ANY, original_pcollection_visualization) |
| |
| def test_auto_stop_dynamic_plotting_when_job_is_terminated(self): |
| fake_pipeline_result = runner.PipelineResult(runner.PipelineState.RUNNING) |
| ie.current_env().set_pipeline_result(self._p, fake_pipeline_result) |
| # When job is running, the dynamic plotting will not be stopped. |
| self.assertFalse(ie.current_env().is_terminated(self._p)) |
| |
| fake_pipeline_result = runner.PipelineResult(runner.PipelineState.DONE) |
| ie.current_env().set_pipeline_result(self._p, fake_pipeline_result) |
| # When job is done, the dynamic plotting will be stopped. |
| self.assertTrue(ie.current_env().is_terminated(self._p)) |
| |
| @patch('pandas.DataFrame.head') |
| def test_display_plain_text_when_kernel_has_no_frontend(self, _mocked_head): |
| # Resets the notebook check to False. |
| ie.current_env()._is_in_notebook = False |
| self.assertIsNone(pv.visualize(self._stream, display_facets=True)) |
| _mocked_head.assert_called_once() |
| |
| def test_event_time_formatter(self): |
| # In microseconds: Monday, March 2, 2020 3:14:54 PM GMT-08:00 |
| event_time_us = 1583190894000000 |
| self.assertEqual( |
| '2020-03-02 15:14:54.000000-0800', |
| pv.event_time_formatter(event_time_us)) |
| |
| def test_event_time_formatter_overflow_lower_bound(self): |
| # A relatively small negative event time, which could be valid in Beam but |
| # has no meaning when visualized. |
| event_time_us = -100000000000000000 |
| self.assertEqual('Min Timestamp', pv.event_time_formatter(event_time_us)) |
| |
| def test_event_time_formatter_overflow_upper_bound(self): |
| # A relatively large event time, which exceeds the upper bound of unix time |
| # Year 2038. It could mean infinite future in Beam but has no meaning |
| # when visualized. |
| # The value in test is supposed to be year 10000. |
| event_time_us = 253402300800000000 |
| self.assertEqual('Max Timestamp', pv.event_time_formatter(event_time_us)) |
| |
| def test_windows_formatter_global(self): |
| gw = GlobalWindow() |
| self.assertEqual(str(gw), pv.windows_formatter([gw])) |
| |
| def test_windows_formatter_interval(self): |
| # The unit is second. |
| iw = IntervalWindow(start=1583190894, end=1583200000) |
| self.assertEqual( |
| '2020-03-02 15:14:54.000000-0800 (2h 31m 46s)', |
| pv.windows_formatter([iw])) |
| |
| def test_pane_info_formatter(self): |
| self.assertEqual( |
| 'Pane 0: Final Early', |
| pv.pane_info_formatter( |
| PaneInfo( |
| is_first=False, |
| is_last=True, |
| timing=PaneInfoTiming.EARLY, |
| index=0, |
| nonspeculative_index=0))) |
| |
| |
| if __name__ == '__main__': |
| unittest.main() |