| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| # pytype: skip-file |
| |
| import os |
| import argparse |
| import logging |
| import subprocess |
| import unittest |
| import tempfile |
| import pytest |
| |
| import apache_beam as beam |
| from apache_beam.runners import render |
| |
| default_options = render.RenderOptions._add_argparse_args( |
| argparse.ArgumentParser()).parse_args([]) |
| |
| |
| class RenderRunnerTest(unittest.TestCase): |
| def test_basic_graph(self): |
| p = beam.Pipeline() |
| _ = ( |
| p | beam.Impulse() | beam.Map(lambda _: 2) |
| | 'CustomName' >> beam.Map(lambda x: x * x)) |
| dot = render.PipelineRenderer(p.to_runner_api(), default_options).to_dot() |
| self.assertIn('digraph', dot) |
| self.assertIn('CustomName', dot) |
| self.assertEqual(dot.count('->'), 2) |
| |
| def test_render_config_validation(self): |
| p = beam.Pipeline() |
| _ = ( |
| p | beam.Impulse() | beam.Map(lambda _: 2) |
| | 'CustomName' >> beam.Map(lambda x: x * x)) |
| pipeline_proto = p.to_runner_api() |
| with pytest.raises(ValueError): |
| render.RenderRunner().run_portable_pipeline( |
| pipeline_proto, render.RenderOptions()) |
| |
| def test_side_input(self): |
| p = beam.Pipeline() |
| pcoll = p | beam.Impulse() | beam.FlatMap(lambda x: [1, 2, 3]) |
| dot = render.PipelineRenderer(p.to_runner_api(), default_options).to_dot() |
| self.assertEqual(dot.count('->'), 1) |
| self.assertNotIn('dashed', dot) |
| |
| _ = pcoll | beam.Map( |
| lambda x, side: x * side, side=beam.pvalue.AsList(pcoll)) |
| dot = render.PipelineRenderer(p.to_runner_api(), default_options).to_dot() |
| self.assertEqual(dot.count('->'), 3) |
| self.assertIn('dashed', dot) |
| |
| def test_composite_collapse(self): |
| p = beam.Pipeline() |
| _ = p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x * x) |
| pipeline_proto = p.to_runner_api() |
| renderer = render.PipelineRenderer(pipeline_proto, default_options) |
| self.assertEqual(renderer.to_dot().count('->'), 8) |
| create_transform_id, = [ |
| id |
| for (id, transform) in pipeline_proto.components.transforms.items() |
| if transform.unique_name == 'Create'] |
| renderer.update(toggle=[create_transform_id]) |
| self.assertEqual(renderer.to_dot().count('->'), 1) |
| |
| |
| class DotRequiringRenderingTest(unittest.TestCase): |
| @classmethod |
| def setUpClass(cls): |
| try: |
| subprocess.run(['dot', '-V'], capture_output=True, check=True) |
| except FileNotFoundError: |
| cls._dot_installed = False |
| else: |
| cls._dot_installed = True |
| |
| def setUp(self) -> None: |
| if not self._dot_installed: # type: ignore[attr-defined] |
| self.skipTest('dot executable not installed') |
| |
| def test_run_portable_pipeline(self): |
| p = beam.Pipeline() |
| _ = ( |
| p | beam.Impulse() | beam.Map(lambda _: 2) |
| | 'CustomName' >> beam.Map(lambda x: x * x)) |
| pipeline_proto = p.to_runner_api() |
| |
| with tempfile.TemporaryDirectory() as tmpdir: |
| svg_path = os.path.join(tmpdir, "my_output.svg") |
| render.RenderRunner().run_portable_pipeline( |
| pipeline_proto, render.RenderOptions(render_output=[svg_path])) |
| assert os.path.exists(svg_path) |
| |
| def test_dot_well_formed(self): |
| p = beam.Pipeline() |
| _ = p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x * x) |
| pipeline_proto = p.to_runner_api() |
| renderer = render.PipelineRenderer(pipeline_proto, default_options) |
| # Doesn't actually look at the output, but ensures dot executes correctly. |
| renderer.render_data() |
| create_transform_id, = [ |
| id |
| for (id, transform) in pipeline_proto.components.transforms.items() |
| if transform.unique_name == 'Create'] |
| renderer.update(toggle=[create_transform_id]) |
| renderer.render_data() |
| |
| def test_leaf_composite_filter(self): |
| p = beam.Pipeline() |
| _ = p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x * x) |
| dot = render.PipelineRenderer( |
| p.to_runner_api(), |
| render.RenderOptions(['--render_leaf_composite_nodes=Create' |
| ])).to_dot() |
| self.assertEqual(dot.count('->'), 1) |
| |
| |
| if __name__ == '__main__': |
| logging.getLogger().setLevel(logging.INFO) |
| unittest.main() |