blob: 0eea8d7d4413b899137d38efbc1ff9ba52b4a092 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for the pipeline options validator module."""
# pytype: skip-file
from __future__ import absolute_import
import logging
import unittest
from builtins import object
from hamcrest import assert_that
from hamcrest import contains_string
from hamcrest import only_contains
from hamcrest.core.base_matcher import BaseMatcher
from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator
# Mock runners to use for validations.
class MockRunners(object):
class DataflowRunner(object):
def get_default_gcp_region(self):
# Return a default so we don't have to specify --region in every test
# (unless specifically testing it).
return 'us-central1'
class TestDataflowRunner(DataflowRunner):
pass
class OtherRunner(object):
pass
# Matcher that always passes for testing on_success_matcher option
class AlwaysPassMatcher(BaseMatcher):
def _matches(self, item):
return True
class SetupTest(unittest.TestCase):
def check_errors_for_arguments(self, errors, args):
"""Checks that there is exactly one error for each given argument."""
missing = []
remaining = list(errors)
for arg in args:
found = False
for error in remaining:
if arg in error:
remaining.remove(error)
found = True
break
if not found:
missing.append('Missing error for: %s.' % arg)
# Return missing and remaining (not matched) errors.
return missing + remaining
def test_local_runner(self):
runner = MockRunners.OtherRunner()
options = PipelineOptions([])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
def test_missing_required_options(self):
options = PipelineOptions([''])
runner = MockRunners.DataflowRunner()
# Remove default region for this test.
runner.get_default_gcp_region = lambda: None
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(
self.check_errors_for_arguments(
errors, ['project', 'staging_location', 'temp_location', 'region']),
[])
def test_gcs_path(self):
def get_validator(temp_location, staging_location):
options = ['--project=example:example', '--job_name=job']
if temp_location is not None:
options.append('--temp_location=' + temp_location)
if staging_location is not None:
options.append('--staging_location=' + staging_location)
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
test_cases = [
{
'temp_location': None,
'staging_location': 'gs://foo/bar',
'errors': ['temp_location']
},
{
'temp_location': None,
'staging_location': None,
'errors': ['staging_location', 'temp_location']
},
{
'temp_location': 'gs://foo/bar',
'staging_location': None,
'errors': []
},
{
'temp_location': 'gs://foo/bar',
'staging_location': 'gs://ABC/bar',
'errors': ['staging_location']
},
{
'temp_location': 'gcs:/foo/bar',
'staging_location': 'gs://foo/bar',
'errors': ['temp_location']
},
{
'temp_location': 'gs:/foo/bar',
'staging_location': 'gs://foo/bar',
'errors': ['temp_location']
},
{
'temp_location': 'gs://ABC/bar',
'staging_location': 'gs://foo/bar',
'errors': ['temp_location']
},
{
'temp_location': 'gs://ABC/bar',
'staging_location': 'gs://foo/bar',
'errors': ['temp_location']
},
{
'temp_location': 'gs://foo',
'staging_location': 'gs://foo/bar',
'errors': ['temp_location']
},
{
'temp_location': 'gs://foo/',
'staging_location': 'gs://foo/bar',
'errors': []
},
{
'temp_location': 'gs://foo/bar',
'staging_location': 'gs://foo/bar',
'errors': []
},
]
for case in test_cases:
errors = get_validator(case['temp_location'],
case['staging_location']).validate()
self.assertEqual(
self.check_errors_for_arguments(errors, case['errors']), [])
def test_project(self):
def get_validator(project):
options = [
'--job_name=job',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar'
]
if project is not None:
options.append('--project=' + project)
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
test_cases = [
{
'project': None, 'errors': ['project']
},
{
'project': '12345', 'errors': ['project']
},
{
'project': 'FOO', 'errors': ['project']
},
{
'project': 'foo:BAR', 'errors': ['project']
},
{
'project': 'fo', 'errors': ['project']
},
{
'project': 'foo', 'errors': []
},
{
'project': 'foo:bar', 'errors': []
},
]
for case in test_cases:
errors = get_validator(case['project']).validate()
self.assertEqual(
self.check_errors_for_arguments(errors, case['errors']), [])
def test_job_name(self):
def get_validator(job_name):
options = [
'--project=example:example',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar'
]
if job_name is not None:
options.append('--job_name=' + job_name)
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
test_cases = [
{
'job_name': None, 'errors': []
},
{
'job_name': '12345', 'errors': ['job_name']
},
{
'job_name': 'FOO', 'errors': ['job_name']
},
{
'job_name': 'foo:bar', 'errors': ['job_name']
},
{
'job_name': 'fo', 'errors': []
},
{
'job_name': 'foo', 'errors': []
},
]
for case in test_cases:
errors = get_validator(case['job_name']).validate()
self.assertEqual(
self.check_errors_for_arguments(errors, case['errors']), [])
def test_num_workers(self):
def get_validator(num_workers):
options = [
'--project=example:example',
'--job_name=job',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar'
]
if num_workers is not None:
options.append('--num_workers=' + num_workers)
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
return validator
test_cases = [
{
'num_workers': None, 'errors': []
},
{
'num_workers': '1', 'errors': []
},
{
'num_workers': '0', 'errors': ['num_workers']
},
{
'num_workers': '-1', 'errors': ['num_workers']
},
]
for case in test_cases:
errors = get_validator(case['num_workers']).validate()
self.assertEqual(
self.check_errors_for_arguments(errors, case['errors']), [])
def test_is_service_runner(self):
test_cases = [
{
'runner': MockRunners.OtherRunner(),
'options': [],
'expected': False,
},
{
'runner': MockRunners.OtherRunner(),
'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'],
'expected': False,
},
{
'runner': MockRunners.OtherRunner(),
'options': ['--dataflow_endpoint=https://dataflow.googleapis.com/'],
'expected': False,
},
{
'runner': MockRunners.DataflowRunner(),
'options': ['--dataflow_endpoint=https://another.service.com'],
'expected': False,
},
{
'runner': MockRunners.DataflowRunner(),
'options': ['--dataflow_endpoint=https://another.service.com/'],
'expected': False,
},
{
'runner': MockRunners.DataflowRunner(),
'options': ['--dataflow_endpoint=https://dataflow.googleapis.com'],
'expected': True,
},
{
'runner': MockRunners.DataflowRunner(),
'options': ['--dataflow_endpoint=https://dataflow.googleapis.com/'],
'expected': True,
},
{
'runner': MockRunners.DataflowRunner(),
'options': [],
'expected': True,
},
]
for case in test_cases:
validator = PipelineOptionsValidator(
PipelineOptions(case['options']), case['runner'])
self.assertEqual(validator.is_service_runner(), case['expected'])
def test_dataflow_job_file_and_template_location_mutually_exclusive(self):
runner = MockRunners.OtherRunner()
options = PipelineOptions(
['--template_location', 'abc', '--dataflow_job_file', 'def'])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertTrue(errors)
def test_validate_template_location(self):
runner = MockRunners.OtherRunner()
options = PipelineOptions([
'--template_location',
'abc',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertFalse(errors)
def test_validate_dataflow_job_file(self):
runner = MockRunners.OtherRunner()
options = PipelineOptions(['--dataflow_job_file', 'abc'])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertFalse(errors)
def test_zone_and_worker_region_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--zone',
'us-east1-b',
'--worker_region',
'us-east1',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('zone', errors[0])
self.assertIn('worker_region', errors[0])
def test_zone_and_worker_zone_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--zone',
'us-east1-b',
'--worker_zone',
'us-east1-c',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('zone', errors[0])
self.assertIn('worker_zone', errors[0])
def test_experiment_region_and_worker_region_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--experiments',
'worker_region=us-west1',
'--worker_region',
'us-east1',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('experiment', errors[0])
self.assertIn('worker_region', errors[0])
def test_experiment_region_and_worker_zone_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--experiments',
'worker_region=us-west1',
'--worker_zone',
'us-east1-b',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('experiment', errors[0])
self.assertIn('worker_region', errors[0])
self.assertIn('worker_zone', errors[0])
def test_worker_region_and_worker_zone_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--worker_region',
'us-east1',
'--worker_zone',
'us-east1-b',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('worker_region', errors[0])
self.assertIn('worker_zone', errors[0])
def test_zone_alias_worker_zone(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--zone=us-east1-b',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
self.assertIsNone(options.view_as(WorkerOptions).zone)
self.assertEqual(options.view_as(WorkerOptions).worker_zone, 'us-east1-b')
def test_region_optional_for_non_service_runner(self):
runner = MockRunners.DataflowRunner()
# Remove default region for this test.
runner.get_default_gcp_region = lambda: None
options = PipelineOptions([
'--project=example:example',
'--temp_location=gs://foo/bar',
'--dataflow_endpoint=http://localhost:20281',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
def test_alias_sdk_container_to_worker_harness(self):
runner = MockRunners.DataflowRunner()
test_image = "SDK_IMAGE"
options = PipelineOptions([
'--sdk_container_image=%s' % test_image,
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
self.assertEqual(
options.view_as(WorkerOptions).worker_harness_container_image,
test_image)
self.assertEqual(
options.view_as(WorkerOptions).sdk_container_image, test_image)
def test_alias_worker_harness_sdk_container_image(self):
runner = MockRunners.DataflowRunner()
test_image = "WORKER_HARNESS"
options = PipelineOptions([
'--worker_harness_container_image=%s' % test_image,
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 0)
self.assertEqual(
options.view_as(WorkerOptions).worker_harness_container_image,
test_image)
self.assertEqual(
options.view_as(WorkerOptions).sdk_container_image, test_image)
def test_worker_harness_sdk_container_image_mutually_exclusive(self):
runner = MockRunners.DataflowRunner()
options = PipelineOptions([
'--worker_harness_container_image=WORKER',
'--sdk_container_image=SDK_ONLY',
'--project=example:example',
'--temp_location=gs://foo/bar',
])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertEqual(len(errors), 1)
self.assertIn('sdk_container_image', errors[0])
self.assertIn('worker_harness_container_image', errors[0])
def test_test_matcher(self):
def get_validator(matcher):
options = [
'--project=example:example',
'--job_name=job',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar',
]
if matcher:
options.append('%s=%s' % ('--on_success_matcher', matcher.decode()))
pipeline_options = PipelineOptions(options)
runner = MockRunners.TestDataflowRunner()
return PipelineOptionsValidator(pipeline_options, runner)
test_case = [
{
'on_success_matcher': None, 'errors': []
},
{
'on_success_matcher': pickler.dumps(AlwaysPassMatcher()),
'errors': []
},
{
'on_success_matcher': b'abc', 'errors': ['on_success_matcher']
},
{
'on_success_matcher': pickler.dumps(object),
'errors': ['on_success_matcher']
},
]
for case in test_case:
errors = get_validator(case['on_success_matcher']).validate()
self.assertEqual(
self.check_errors_for_arguments(errors, case['errors']), [])
def test_transform_name_mapping_without_update(self):
options = [
'--project=example:example',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar',
'--transform_name_mapping={\"fromPardo\":\"toPardo\"}'
]
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
errors = validator.validate()
assert_that(
errors,
only_contains(
contains_string(
'Transform name mapping option is only useful when '
'--update and --streaming is specified')))
def test_transform_name_mapping_invalid_format(self):
options = [
'--project=example:example',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar',
'--update',
'--job_name=test',
'--streaming',
'--transform_name_mapping={\"fromPardo\":123}'
]
pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
errors = validator.validate()
assert_that(
errors,
only_contains(
contains_string('Invalid transform name mapping format.')))
def test_type_check_additional(self):
runner = MockRunners.OtherRunner()
options = PipelineOptions(['--type_check_additional=all'])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertFalse(errors)
options = PipelineOptions(['--type_check_additional='])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertFalse(errors)
def test_type_check_additional_unrecognized_feature(self):
runner = MockRunners.OtherRunner()
options = PipelineOptions(['--type_check_additional=all,dfgdf'])
validator = PipelineOptionsValidator(options, runner)
errors = validator.validate()
self.assertTrue(errors)
def test_environment_options(self):
test_cases = [
{
'options': ['--environment_type=dOcKeR'], 'errors': []
},
{
'options': [
'--environment_type=dOcKeR',
'--environment_options=docker_container_image=foo'
],
'errors': []
},
{
'options': [
'--environment_type=dOcKeR', '--environment_config=foo'
],
'errors': []
},
{
'options': [
'--environment_type=dOcKeR',
'--environment_options=docker_container_image=foo',
'--environment_config=foo'
],
'errors': ['environment_config']
},
{
'options': [
'--environment_type=dOcKeR',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=external_service_address=foo'
],
'errors': [
'process_command',
'process_variables',
'external_service_address'
]
},
{
'options': ['--environment_type=pRoCeSs'],
'errors': ['process_command']
},
{
'options': [
'--environment_type=pRoCeSs',
'--environment_options=process_command=foo'
],
'errors': []
},
{
'options': [
'--environment_type=pRoCeSs', '--environment_config=foo'
],
'errors': []
},
{
'options': [
'--environment_type=pRoCeSs',
'--environment_options=process_command=foo',
'--environment_config=foo'
],
'errors': ['environment_config']
},
{
'options': [
'--environment_type=pRoCeSs',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=docker_container_image=foo',
'--environment_options=external_service_address=foo'
],
'errors': ['docker_container_image', 'external_service_address']
},
{
'options': ['--environment_type=eXtErNaL'],
'errors': ['external_service_address']
},
{
'options': [
'--environment_type=eXtErNaL',
'--environment_options=external_service_address=foo'
],
'errors': []
},
{
'options': [
'--environment_type=eXtErNaL', '--environment_config=foo'
],
'errors': []
},
{
'options': [
'--environment_type=eXtErNaL',
'--environment_options=external_service_address=foo',
'--environment_config=foo'
],
'errors': ['environment_config']
},
{
'options': [
'--environment_type=eXtErNaL',
'--environment_options=external_service_address=foo',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=docker_container_image=foo',
],
'errors': [
'process_command',
'process_variables',
'docker_container_image'
]
},
{
'options': ['--environment_type=lOoPbACk'], 'errors': []
},
{
'options': [
'--environment_type=lOoPbACk', '--environment_config=foo'
],
'errors': ['environment_config']
},
{
'options': [
'--environment_type=lOoPbACk',
'--environment_options=docker_container_image=foo',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=external_service_address=foo',
],
'errors': [
'docker_container_image',
'process_command',
'process_variables',
'external_service_address'
]
},
{
'options': ['--environment_type=beam:env:foo:v1'], 'errors': []
},
{
'options': [
'--environment_type=beam:env:foo:v1',
'--environment_config=foo'
],
'errors': []
},
{
'options': [
'--environment_type=beam:env:foo:v1',
'--environment_options=docker_container_image=foo',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=external_service_address=foo',
],
'errors': [
'docker_container_image',
'process_command',
'process_variables',
'external_service_address'
]
},
{
'options': [
'--environment_options=docker_container_image=foo',
'--environment_options=process_command=foo',
'--environment_options=process_variables=foo=bar',
'--environment_options=external_service_address=foo',
],
'errors': [
'docker_container_image',
'process_command',
'process_variables',
'external_service_address'
]
},
]
errors = []
for case in test_cases:
validator = PipelineOptionsValidator(
PipelineOptions(case['options']), MockRunners.OtherRunner())
validation_result = validator.validate()
validation_errors = self.check_errors_for_arguments(
validation_result, case['errors'])
if validation_errors:
errors.append(
'Options "%s" had unexpected validation results: "%s"' %
(' '.join(case['options']), ' '.join(validation_errors)))
self.assertEqual(errors, [])
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()