| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Test Pipeline, a wrapper of Pipeline for test purpose""" |
| |
| # pytype: skip-file |
| |
| import argparse |
| import shlex |
| from unittest import SkipTest |
| |
| from apache_beam.internal import pickler |
| from apache_beam.options.pipeline_options import PipelineOptions |
| from apache_beam.pipeline import Pipeline |
| from apache_beam.runners.runner import PipelineState |
| |
| __all__ = [ |
| 'TestPipeline', |
| ] |
| |
| |
| class TestPipeline(Pipeline): |
| """:class:`TestPipeline` class is used inside of Beam tests that can be |
| configured to run against pipeline runner. |
| |
| It has a functionality to parse arguments from command line and build pipeline |
| options for tests who runs against a pipeline runner and utilizes resources |
| of the pipeline runner. Those test functions are recommended to be tagged by |
| ``@attr("ValidatesRunner")`` annotation. |
| |
| In order to configure the test with customized pipeline options from command |
| line, system argument ``--test-pipeline-options`` can be used to obtains a |
| list of pipeline options. If no options specified, default value will be used. |
| |
| For example, use following command line to execute all ValidatesRunner tests:: |
| |
| python setup.py nosetests -a ValidatesRunner \\ |
| --test-pipeline-options="--runner=DirectRunner \\ |
| --job_name=myJobName \\ |
| --num_workers=1" |
| |
| For example, use assert_that for test validation:: |
| |
| with TestPipeline() as pipeline: |
| pcoll = ... |
| assert_that(pcoll, equal_to(...)) |
| """ |
| # Command line options read in by pytest. |
| # If this is not None, will use as default value for --test-pipeline-options. |
| pytest_test_pipeline_options = None |
| |
| def __init__( |
| self, |
| runner=None, |
| options=None, |
| argv=None, |
| is_integration_test=False, |
| blocking=True, |
| additional_pipeline_args=None): |
| """Initialize a pipeline object for test. |
| |
| Args: |
| runner (~apache_beam.runners.runner.PipelineRunner): An object of type |
| :class:`~apache_beam.runners.runner.PipelineRunner` that will be used |
| to execute the pipeline. For registered runners, the runner name can be |
| specified, otherwise a runner object must be supplied. |
| options (~apache_beam.options.pipeline_options.PipelineOptions): |
| A configured |
| :class:`~apache_beam.options.pipeline_options.PipelineOptions` |
| object containing arguments that should be used for running the |
| pipeline job. |
| argv (List[str]): A list of arguments (such as :data:`sys.argv`) to be |
| used for building a |
| :class:`~apache_beam.options.pipeline_options.PipelineOptions` object. |
| This will only be used if argument **options** is :data:`None`. |
| is_integration_test (bool): :data:`True` if the test is an integration |
| test, :data:`False` otherwise. |
| blocking (bool): Run method will wait until pipeline execution is |
| completed. |
| additional_pipeline_args (List[str]): additional pipeline arguments to be |
| included when construction the pipeline options object. |
| |
| Raises: |
| ValueError: if either the runner or options argument is not |
| of the expected type. |
| """ |
| self.is_integration_test = is_integration_test |
| self.not_use_test_runner_api = False |
| additional_pipeline_args = additional_pipeline_args or [] |
| self.options_list = ( |
| self._parse_test_option_args(argv) + additional_pipeline_args) |
| self.blocking = blocking |
| if options is None: |
| options = PipelineOptions(self.options_list) |
| super(TestPipeline, self).__init__(runner, options) |
| |
| def run(self, test_runner_api=True): |
| result = super(TestPipeline, self).run( |
| test_runner_api=( |
| False if self.not_use_test_runner_api else test_runner_api)) |
| if self.blocking: |
| state = result.wait_until_finish() |
| assert state in (PipelineState.DONE, PipelineState.CANCELLED), \ |
| "Pipeline execution failed." |
| |
| return result |
| |
| def get_pipeline_options(self): |
| return self._options |
| |
| def _parse_test_option_args(self, argv): |
| """Parse value of command line argument: --test-pipeline-options to get |
| pipeline options. |
| |
| Args: |
| argv: An iterable of command line arguments to be used. If not specified |
| then sys.argv will be used as input for parsing arguments. |
| |
| Returns: |
| An argument list of options that can be parsed by argparser or directly |
| build a pipeline option. |
| """ |
| parser = argparse.ArgumentParser() |
| parser.add_argument( |
| '--test-pipeline-options', |
| type=str, |
| action='store', |
| help='only run tests providing service options') |
| parser.add_argument( |
| '--not-use-test-runner-api', |
| action='store_true', |
| default=False, |
| help='whether not to use test-runner-api') |
| known, unused_argv = parser.parse_known_args(argv) |
| test_pipeline_options = known.test_pipeline_options or \ |
| TestPipeline.pytest_test_pipeline_options |
| if self.is_integration_test and not test_pipeline_options: |
| # Skip integration test when argument '--test-pipeline-options' is not |
| # specified since nose calls integration tests when runs unit test by |
| # 'setup.py test'. |
| raise SkipTest( |
| 'IT is skipped because --test-pipeline-options ' |
| 'is not specified') |
| |
| self.not_use_test_runner_api = known.not_use_test_runner_api |
| return shlex.split(test_pipeline_options) \ |
| if test_pipeline_options else [] |
| |
| def get_full_options_as_args(self, **extra_opts): |
| """Get full pipeline options as an argument list. |
| |
| Append extra pipeline options to existing option list if provided. |
| Test verifier (if contains in extra options) should be pickled before |
| appending, and will be unpickled later in the TestRunner. |
| """ |
| options = list(self.options_list) |
| for k, v in extra_opts.items(): |
| if not v: |
| continue |
| elif isinstance(v, bool) and v: |
| options.append('--%s' % k) |
| elif 'matcher' in k: |
| options.append('--%s=%s' % (k, pickler.dumps(v).decode())) |
| else: |
| options.append('--%s=%s' % (k, v)) |
| return options |
| |
| def get_option(self, opt_name, bool_option=False): |
| """Get a pipeline option value by name |
| |
| Args: |
| opt_name: The name of the pipeline option. |
| |
| Returns: |
| None if option is not found in existing option list which is generated |
| by parsing value of argument `test-pipeline-options`. |
| """ |
| parser = argparse.ArgumentParser() |
| opt_name = opt_name[:2] if opt_name[:2] == '--' else opt_name |
| # Option name should start with '--' when it's used for parsing. |
| if bool_option: |
| parser.add_argument('--' + opt_name, action='store_true') |
| else: |
| parser.add_argument('--' + opt_name, type=str, action='store') |
| known, _ = parser.parse_known_args(self.options_list) |
| return getattr(known, opt_name) if hasattr(known, opt_name) else None |