blob: 56f305a01b74f4ed166cd289618f034618fb1a00 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for the pipeline options validator module."""
# pytype: skip-file
import logging
import unittest
from hamcrest import assert_that
from hamcrest import contains_string
from hamcrest import only_contains
from hamcrest.core.base_matcher import BaseMatcher
from parameterized import parameterized
from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator
# Mock runners to use for validations.
class MockRunners(object):
class DataflowRunner(object):
def get_default_gcp_region(self):
# Return a default so we don't have to specify --region in every test
# (unless specifically testing it).
return 'us-central1'
class TestDataflowRunner(DataflowRunner):
pass
class OtherRunner(object):
pass
# Matcher that always passes for testing on_success_matcher option
class AlwaysPassMatcher(BaseMatcher):
def _matches(self, item):
return True
class SetupTest(unittest.TestCase):
def check_errors_for_arguments(self, errors, args):
"""Checks that there is exactly one error for each given argument."""
missing = []
remaining = list(errors)
for arg in args:
found = False
for error in remaining:
if arg in error:
remaining.remove(error)
found = True
break
if not found:
missing.append('Missing error for: %s.' % arg)
# Return missing and remaining (not matched) errors.
return missing + remaining
def test_local_runner(self):
runner = MockRunners.OtherRunner()
options = PipelineOptions([])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
def test_missing_required_options(self):
options = PipelineOptions([''])
runner = MockRunners.DataflowRunner()
# Remove default region for this test.
runner.get_default_gcp_region = lambda: None
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(
self.check_errors_for_arguments(
errors, ['project', 'staging_location', 'temp_location', 'region']),
[])
@parameterized.expand([
(None, 'gs://foo/bar',
[]), (None, None, ['staging_location', 'temp_location']),
('gs://foo/bar', None, []), ('gs://foo/bar', 'gs://ABC/bar',
[]), ('gcs:/foo/bar', 'gs://foo/bar', []),
('gs:/foo/bar', 'gs://foo/bar', []), ('gs://ABC/bar', 'gs://foo/bar', []),
('gs://ABC/bar', 'gs://BCD/bar', ['temp_location', 'staging_location'
]), ('gs://foo', 'gs://foo/bar', []),
('gs://foo/', 'gs://foo/bar', []), ('gs://foo/bar', 'gs://foo/bar', [])
])
@unittest.skip('Not compatible with new GCS client. See GH issue #26335.')
def test_gcs_path(self, temp_location, staging_location, expected_error_args):
def get_validator(_temp_location, _staging_location):
options = ['--project=example:example', '--job_name=job']
if _temp_location is not None:
options.append('--temp_location=' + _temp_location)
if _staging_location is not None:
options.append('--staging_location=' + _staging_location)
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
errors = get_validator(temp_location, staging_location).validate()
self.assertEqual(
self.check_errors_for_arguments(errors, expected_error_args), [])
@parameterized.expand([(None, ['project']), ('12345', ['project']),
('FOO', ['project']), ('foo:BAR', ['project']),
('fo', ['project']), ('foo', []), ('foo:bar', [])])
def test_project(self, project, expected_error_args):
def get_validator(_project):
options = [
'--job_name=job',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar'
]
if _project is not None:
options.append('--project=' + _project)
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
errors = get_validator(project).validate()
self.assertEqual(
self.check_errors_for_arguments(errors, expected_error_args), [])
@parameterized.expand([(None, []), ('12345', ['job_name']),
('FOO', ['job_name']), ('foo:bar', ['job_name']),
('fo', []), ('foo', [])])
def test_job_name(self, job_name, expected_error_args):
def get_validator(_job_name):
options = [
'--project=example:example',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar'
]
if _job_name is not None:
options.append('--job_name=' + _job_name)
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
errors = get_validator(job_name).validate()
self.assertEqual(
self.check_errors_for_arguments(errors, expected_error_args), [])
@parameterized.expand([(None, []), ('1', []), ('0', ['num_workers']),
('-1', ['num_workers'])])
def test_num_workers(self, num_workers, expected_error_args):
def get_validator(_num_workers):
options = [
'--project=example:example',
'--job_name=job',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar'
]
if _num_workers is not None:
options.append('--num_workers=' + _num_workers)
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
errors = get_validator(num_workers).validate()
self.assertEqual(
self.check_errors_for_arguments(errors, expected_error_args), [])
@parameterized.expand([
(
MockRunners.OtherRunner(),
[],
False,
),
(
MockRunners.OtherRunner(),
['--dataflow_endpoint=https://dataflow.googleapis.com'],
False,
),
(
MockRunners.OtherRunner(),
['--dataflow_endpoint=https://dataflow.googleapis.com/'],
False,
), (
MockRunners.DataflowRunner(),
[],
True,
),
(
MockRunners.DataflowRunner(),
['--dataflow_endpoint=https://another.service.com'],
True,
),
(
MockRunners.DataflowRunner(),
['--dataflow_endpoint=https://dataflow.googleapis.com'],
True,
),
(
MockRunners.DataflowRunner(),
['--dataflow_endpoint=http://localhost:1000'],
False,
),
(
MockRunners.DataflowRunner(),
['--dataflow_endpoint=foo: //dataflow. googleapis. com'],
True,
), (
MockRunners.DataflowRunner(),
[],
True,
)
])
def test_is_service_runner(self, runner, options, expected):
validator = PipelineOptionsValidator(PipelineOptions(options), runner)
self.assertEqual(validator.is_service_runner(), expected)
def test_dataflow_job_file_and_template_location_mutually_exclusive(self):
runner = MockRunners.OtherRunner()
options = PipelineOptions(
['--template_location', 'abc', '--dataflow_job_file', 'def'])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertTrue(errors)
def test_validate_template_location(self):
runner = MockRunners.OtherRunner()
options = PipelineOptions([
'--template_location',
'abc',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertFalse(errors)
def test_validate_dataflow_job_file(self):
runner = MockRunners.OtherRunner()
options = PipelineOptions(['--dataflow_job_file', 'abc'])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertFalse(errors)
def test_num_workers_is_positive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--num_workers=-1',
'--worker_region=us-east1',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('num_workers', errors[0])
self.assertIn('-1', errors[0])
def test_max_num_workers_is_positive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--max_num_workers=-1',
'--worker_region=us-east1',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('max_num_workers', errors[0])
self.assertIn('-1', errors[0])
def test_num_workers_cannot_exceed_max_num_workers(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--num_workers=43',
'--max_num_workers=42',
'--worker_region=us-east1',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('num_workers', errors[0])
self.assertIn('43', errors[0])
self.assertIn('max_num_workers', errors[0])
self.assertIn('42', errors[0])
def test_num_workers_can_equal_max_num_workers(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--num_workers=42',
'--max_num_workers=42',
'--worker_region=us-east1',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
def test_zone_and_worker_region_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--zone',
'us-east1-b',
'--worker_region',
'us-east1',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('zone', errors[0])
self.assertIn('worker_region', errors[0])
def test_zone_and_worker_zone_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--zone',
'us-east1-b',
'--worker_zone',
'us-east1-c',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('zone', errors[0])
self.assertIn('worker_zone', errors[0])
def test_experiment_region_and_worker_region_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--experiments',
'worker_region=us-west1',
'--worker_region',
'us-east1',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('experiment', errors[0])
self.assertIn('worker_region', errors[0])
def test_region_and_worker_zone_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--worker_region=us-west1',
'--worker_zone=us-east1-b',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('worker_region', errors[0])
self.assertIn('worker_zone', errors[0])
def test_programmatically_set_experiment_passed_as_string(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions(
project='example.com:example',
temp_location='gs://foo/bar/',
experiments='enable_prime',
dataflow_service_options='use_runner_v2',
)
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 2)
self.assertIn('experiments', errors[0])
self.assertIn('dataflow_service_options', errors[1])
def test_programmatically_set_experiment_passed_as_list(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions(
project='example.com:example',
temp_location='gs://foo/bar/',
experiments=['enable_prime'],
dataflow_service_options=['use_runner_v2'],
)
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
self.assertEqual(
options.view_as(DebugOptions).experiments, ['enable_prime'])
self.assertEqual(
options.view_as(GoogleCloudOptions).dataflow_service_options,
['use_runner_v2'])
def test_worker_region_and_worker_zone_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--worker_region',
'us-east1',
'--worker_zone',
'us-east1-b',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('worker_region', errors[0])
self.assertIn('worker_zone', errors[0])
def test_zone_alias_worker_zone(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--zone=us-east1-b',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
self.assertIsNone(options.view_as(WorkerOptions).zone)
self.assertEqual(options.view_as(WorkerOptions).worker_zone, 'us-east1-b')
def test_region_optional_for_non_service_runner(self):
runner = MockRunners.OtherRunner()
# Remove default region for this test.
runner.get_default_gcp_region = lambda: None
options = PipelineOptions([
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
def test_dataflow_endpoint_is_a_url(self):
runner = MockRunners.DataflowRunner()
# Remove default region for this test.
options = PipelineOptions([
'--project=example:example',
'--temp_location=gs://foo/bar',
'--staging_location=gs://foo/baz',
'--dataflow_endpoint=foo and bar'
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1, errors)
self.assertIn("Invalid url (foo and bar)", errors[0])
def test_alias_sdk_container_to_worker_harness(self):
runner = MockRunners.DataflowRunner()
test_image = "SDK_IMAGE"
options = PipelineOptions([
'--sdk_container_image=%s' % test_image,
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
self.assertEqual(
options.view_as(WorkerOptions).worker_harness_container_image,
test_image)
self.assertEqual(
options.view_as(WorkerOptions).sdk_container_image, test_image)
def test_alias_worker_harness_sdk_container_image(self):
runner = MockRunners.DataflowRunner()
test_image = "WORKER_HARNESS"
options = PipelineOptions([
'--worker_harness_container_image=%s' % test_image,
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
self.assertEqual(
options.view_as(WorkerOptions).worker_harness_container_image,
test_image)
self.assertEqual(
options.view_as(WorkerOptions).sdk_container_image, test_image)
def test_worker_harness_sdk_container_image_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--worker_harness_container_image=WORKER',
'--sdk_container_image=SDK_ONLY',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('sdk_container_image', errors[0])
self.assertIn('worker_harness_container_image', errors[0])
def test_prebuild_sdk_container_base_image_disallowed(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--project=example:example',
'--temp_location=gs://foo/bar',
'--prebuild_sdk_container_base_image=gcr.io/foo:bar'
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('prebuild_sdk_container_base_image', errors[0])
self.assertIn('sdk_container_image', errors[0])
def test_prebuild_sdk_container_base_allowed_if_matches_custom_image(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--project=example:example',
'--temp_location=gs://foo/bar',
'--sdk_container_image=gcr.io/foo:bar',
'--prebuild_sdk_container_base_image=gcr.io/foo:bar'
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
@parameterized.expand([(None, []), (pickler.dumps(AlwaysPassMatcher()), []),
(b'abc', ['on_success_matcher']),
(pickler.dumps(object), ['on_success_matcher'])])
def test_test_matcher(self, on_success_matcher, errors):
def get_validator(matcher):
options = [
'--project=example:example',
'--job_name=job',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar',
]
if matcher:
options.append('%s=%s' % ('--on_success_matcher', matcher.decode()))
pipeline_options = PipelineOptions(options)
runner = MockRunners.TestDataflowRunner()
return PipelineOptionsValidator(pipeline_options, runner)
errors = get_validator(on_success_matcher).validate()
self.assertEqual(self.check_errors_for_arguments(errors, errors), [])
def test_transform_name_mapping_without_update(self):
options = [
'--project=example:example',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar',
'--transform_name_mapping={\"fromPardo\":\"toPardo\"}'
]
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
errors = validator.validate()
assert_that(
errors,
only_contains(
contains_string(
'Transform name mapping option is only useful when '
'--update and --streaming is specified')))
def test_transform_name_mapping_invalid_format(self):
options = [
'--project=example:example',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar',
'--update',
'--job_name=test',
'--streaming',
'--transform_name_mapping={\"fromPardo\":123}'
]
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
errors = validator.validate()
assert_that(
errors,
only_contains(
contains_string('Invalid transform name mapping format.')))
def test_type_check_additional(self):
runner = MockRunners.OtherRunner()
options = PipelineOptions(['--type_check_additional=all'])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertFalse(errors)
options = PipelineOptions(['--type_check_additional='])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertFalse(errors)
def test_type_check_additional_unrecognized_feature(self):
runner = MockRunners.OtherRunner()
options = PipelineOptions(['--type_check_additional=all,dfgdf'])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertTrue(errors)
@parameterized.expand([
(['--environment_type=dOcKeR'], []),
([
'--environment_type=dOcKeR',
'--environment_options=docker_container_image=foo'
], []), (['--environment_type=dOcKeR', '--environment_config=foo'], []),
([
'--environment_type=dOcKeR',
'--environment_options=docker_container_image=foo',
'--environment_config=foo'
], ['environment_config']),
([
'--environment_type=dOcKeR',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=external_service_address=foo'
], ['process_command', 'process_variables', 'external_service_address']),
(['--environment_type=pRoCeSs'], ['process_command']),
([
'--environment_type=pRoCeSs',
'--environment_options=process_command=foo'
], []), (['--environment_type=pRoCeSs', '--environment_config=foo'], []),
([
'--environment_type=pRoCeSs',
'--environment_options=process_command=foo',
'--environment_config=foo'
], ['environment_config']),
([
'--environment_type=pRoCeSs',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=docker_container_image=foo',
'--environment_options=external_service_address=foo'
], ['docker_container_image', 'external_service_address']),
(['--environment_type=eXtErNaL'], ['external_service_address']),
([
'--environment_type=eXtErNaL',
'--environment_options=external_service_address=foo'
], []), (['--environment_type=eXtErNaL', '--environment_config=foo'], []),
([
'--environment_type=eXtErNaL',
'--environment_options=external_service_address=foo',
'--environment_config=foo'
], ['environment_config']),
([
'--environment_type=eXtErNaL',
'--environment_options=external_service_address=foo',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=docker_container_image=foo'
], ['process_command', 'process_variables',
'docker_container_image']), (['--environment_type=lOoPbACk'], []),
(['--environment_type=lOoPbACk',
'--environment_config=foo'], ['environment_config']),
([
'--environment_type=lOoPbACk',
'--environment_options=docker_container_image=foo',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=external_service_address=foo'
],
[
'docker_container_image',
'process_command',
'process_variables',
'external_service_address'
]), (['--environment_type=beam:env:foo:v1'], []),
(['--environment_type=beam:env:foo:v1', '--environment_config=foo'], []),
([
'--environment_type=beam:env:foo:v1',
'--environment_options=docker_container_image=foo',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=external_service_address=foo'
],
[
'docker_container_image',
'process_command',
'process_variables',
'external_service_address'
]),
([
'--environment_options=docker_container_image=foo',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=external_service_address=foo'
],
[
'docker_container_image',
'process_command',
'process_variables',
'external_service_address'
])
])
def test_environment_options(self, options, expected_error_args):
errors = []
validator = PipelineOptionsValidator(
PipelineOptions(options), MockRunners.OtherRunner())
validation_result = validator.validate()
validation_errors = self.check_errors_for_arguments(
validation_result, expected_error_args)
if validation_errors:
errors.append(
'Options "%s" had unexpected validation results: "%s"' %
(' '.join(options), ' '.join(validation_errors)))
self.assertEqual(errors, [], expected_error_args)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()