blob: cd6cce204b782d9aeb743d88d25b09fdd46b6a02 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Unit tests for the pipeline options module."""
# pytype: skip-file
import json
import logging
import os
import unittest
import hamcrest as hc
import mock
from parameterized import parameterized
from apache_beam.options.pipeline_options import CrossLanguageOptions
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import JobServerOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.pipeline_options import _BeamArgumentParser
from apache_beam.options.pipeline_options_validator import PipelineOptionsValidator
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
_LOGGER = logging.getLogger(__name__)
try:
import apache_beam.io.gcp.gcsio # pylint: disable=unused-import
has_gcsio = True
except ImportError:
has_gcsio = False
# Mock runners to use for validations.
class MockRunners(object):
class DataflowRunner(object):
def get_default_gcp_region(self):
# Return a default so we don't have to specify --region in every test
# (unless specifically testing it).
return 'us-central1'
class MockGoogleCloudOptionsNoBucket(GoogleCloudOptions):
def _create_default_gcs_bucket(self):
return None
class MockGoogleCloudOptionsWithBucket(GoogleCloudOptions):
def _create_default_gcs_bucket(self):
return "gs://default/bucket"
class PipelineOptionsTest(unittest.TestCase):
def setUp(self):
# Reset runtime options to avoid side-effects caused by other tests.
# Note that is_accessible assertions require runtime_options to
# be uninitialized.
RuntimeValueProvider.set_runtime_options(None)
def tearDown(self):
# Reset runtime options to avoid side-effects in other tests.
RuntimeValueProvider.set_runtime_options(None)
TEST_CASES = [
(['--num_workers', '5'],
{
'num_workers': 5,
'mock_flag': False,
'mock_option': None,
'mock_multi_option': None
}, [DisplayDataItemMatcher('num_workers', 5)]),
(['--direct_num_workers', '5'],
{
'direct_num_workers': 5,
'mock_flag': False,
'mock_option': None,
'mock_multi_option': None
}, [DisplayDataItemMatcher('direct_num_workers', 5)]),
(['--direct_running_mode', 'multi_threading'],
{
'direct_running_mode': 'multi_threading',
'mock_flag': False,
'mock_option': None,
'mock_multi_option': None
}, [DisplayDataItemMatcher('direct_running_mode', 'multi_threading')]),
(['--direct_running_mode', 'multi_processing'],
{
'direct_running_mode': 'multi_processing',
'mock_flag': False,
'mock_option': None,
'mock_multi_option': None
}, [DisplayDataItemMatcher('direct_running_mode', 'multi_processing')]),
(['--profile_cpu', '--profile_location', 'gs://bucket/', 'ignored'],
{
'profile_cpu': True,
'profile_location': 'gs://bucket/',
'mock_flag': False,
'mock_option': None,
'mock_multi_option': None
},
[
DisplayDataItemMatcher('profile_cpu', True),
DisplayDataItemMatcher('profile_location', 'gs://bucket/')
]),
(['--num_workers', '5', '--mock_flag'],
{
'num_workers': 5,
'mock_flag': True,
'mock_option': None,
'mock_multi_option': None
},
[
DisplayDataItemMatcher('num_workers', 5),
DisplayDataItemMatcher('mock_flag', True)
]),
(['--mock_option', 'abc'], {
'mock_flag': False, 'mock_option': 'abc', 'mock_multi_option': None
}, [DisplayDataItemMatcher('mock_option', 'abc')]),
(['--mock_option', ' abc def '],
{
'mock_flag': False,
'mock_option': ' abc def ',
'mock_multi_option': None
}, [DisplayDataItemMatcher('mock_option', ' abc def ')]),
(['--mock_option= abc xyz '],
{
'mock_flag': False,
'mock_option': ' abc xyz ',
'mock_multi_option': None
}, [DisplayDataItemMatcher('mock_option', ' abc xyz ')]),
([
'--mock_option=gs://my bucket/my folder/my file',
'--mock_multi_option=op1',
'--mock_multi_option=op2'
],
{
'mock_flag': False,
'mock_option': 'gs://my bucket/my folder/my file',
'mock_multi_option': ['op1', 'op2']
},
[
DisplayDataItemMatcher(
'mock_option', 'gs://my bucket/my folder/my file'),
DisplayDataItemMatcher('mock_multi_option', ['op1', 'op2'])
]),
(['--mock_multi_option=op1', '--mock_multi_option=op2'],
{
'mock_flag': False,
'mock_option': None,
'mock_multi_option': ['op1', 'op2']
}, [DisplayDataItemMatcher('mock_multi_option', ['op1', 'op2'])]),
(['--mock_json_option={"11a": 0, "37a": 1}'],
{
'mock_flag': False,
'mock_option': None,
'mock_multi_option': None,
'mock_json_option': {
'11a': 0, '37a': 1
},
}, [DisplayDataItemMatcher('mock_json_option', {
'11a': 0, '37a': 1
})]),
]
# Used for testing newly added flags.
class MockOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--mock_flag', action='store_true', help='mock flag')
parser.add_argument('--mock_option', help='mock option')
parser.add_argument(
'--mock_multi_option', action='append', help='mock multi option')
parser.add_argument('--option with space', help='mock option with space')
parser.add_argument('--mock_json_option', type=json.loads, default={})
# Use with MockOptions in test cases where multiple option classes are needed.
class FakeOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--fake_flag', action='store_true', help='fake flag')
parser.add_argument('--fake_option', help='fake option')
parser.add_argument(
'--fake_multi_option', action='append', help='fake multi option')
@parameterized.expand(TEST_CASES)
def test_display_data(self, flags, _, display_data):
options = PipelineOptions(flags=flags)
dd = DisplayData.create_from(options)
hc.assert_that(dd.items, hc.contains_inanyorder(*display_data))
@parameterized.expand(TEST_CASES)
def test_get_all_options_subclass(self, flags, expected, _):
options = PipelineOptionsTest.MockOptions(flags=flags)
self.assertLessEqual(expected.items(), options.get_all_options().items())
self.assertEqual(
options.view_as(PipelineOptionsTest.MockOptions).mock_flag,
expected['mock_flag'])
self.assertEqual(
options.view_as(PipelineOptionsTest.MockOptions).mock_option,
expected['mock_option'])
self.assertEqual(
options.view_as(PipelineOptionsTest.MockOptions).mock_multi_option,
expected['mock_multi_option'])
@parameterized.expand(TEST_CASES)
def test_get_all_options(self, flags, expected, _):
options = PipelineOptions(flags=flags)
self.assertLessEqual(expected.items(), options.get_all_options().items())
self.assertEqual(
options.view_as(PipelineOptionsTest.MockOptions).mock_flag,
expected['mock_flag'])
self.assertEqual(
options.view_as(PipelineOptionsTest.MockOptions).mock_option,
expected['mock_option'])
self.assertEqual(
options.view_as(PipelineOptionsTest.MockOptions).mock_multi_option,
expected['mock_multi_option'])
@parameterized.expand(TEST_CASES)
def test_subclasses_of_pipeline_options_can_be_instantiated(
self, flags, expected, _):
mock_options = PipelineOptionsTest.MockOptions(flags=flags)
self.assertEqual(mock_options.mock_flag, expected['mock_flag'])
self.assertEqual(mock_options.mock_option, expected['mock_option'])
self.assertEqual(
mock_options.mock_multi_option, expected['mock_multi_option'])
@parameterized.expand(TEST_CASES)
def test_views_can_be_constructed_from_pipeline_option_subclasses(
self, flags, expected, _):
fake_options = PipelineOptionsTest.FakeOptions(flags=flags)
mock_options = fake_options.view_as(PipelineOptionsTest.MockOptions)
self.assertEqual(mock_options.mock_flag, expected['mock_flag'])
self.assertEqual(mock_options.mock_option, expected['mock_option'])
self.assertEqual(
mock_options.mock_multi_option, expected['mock_multi_option'])
def test_views_do_not_expose_options_defined_by_other_views(self):
flags = ['--mock_option=mock_value', '--fake_option=fake_value']
options = PipelineOptions(flags)
assert options.view_as(
PipelineOptionsTest.MockOptions).mock_option == 'mock_value'
assert options.view_as(
PipelineOptionsTest.FakeOptions).fake_option == 'fake_value'
assert options.view_as(PipelineOptionsTest.MockOptions).view_as(
PipelineOptionsTest.FakeOptions).fake_option == 'fake_value'
self.assertRaises(
AttributeError,
lambda: options.view_as(PipelineOptionsTest.MockOptions).fake_option)
self.assertRaises(
AttributeError,
lambda: options.view_as(PipelineOptionsTest.MockOptions).view_as(
PipelineOptionsTest.FakeOptions).view_as(
PipelineOptionsTest.MockOptions).fake_option)
@parameterized.expand(TEST_CASES)
def test_from_dictionary(self, flags, expected, _):
options = PipelineOptions(flags=flags)
all_options_dict = options.get_all_options()
options_from_dict = PipelineOptions.from_dictionary(all_options_dict)
self.assertEqual(
options_from_dict.view_as(PipelineOptionsTest.MockOptions).mock_flag,
expected['mock_flag'])
self.assertEqual(
options.view_as(PipelineOptionsTest.MockOptions).mock_option,
expected['mock_option'])
self.assertEqual(
options.view_as(PipelineOptionsTest.MockOptions).mock_multi_option,
expected['mock_multi_option'])
self.assertEqual(
options.view_as(PipelineOptionsTest.MockOptions).mock_json_option,
expected.get('mock_json_option', {}))
def test_none_from_dictionary(self):
class NoneDefaultOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--test_arg_none', default=None, type=int)
parser.add_argument('--test_arg_int', default=1, type=int)
options_dict = {'test_arg_none': None, 'test_arg_int': 5}
options_from_dict = NoneDefaultOptions.from_dictionary(options_dict)
result = options_from_dict.get_all_options()
self.assertEqual(result['test_arg_int'], 5)
self.assertEqual(result['test_arg_none'], None)
def test_from_kwargs(self):
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--test_arg')
# kwarg takes precedence over parsed flag
options = PipelineOptions(flags=['--test_arg=A'], test_arg='B')
self.assertEqual(options.view_as(MyOptions).test_arg, 'B')
self.assertEqual(options.get_all_options()['test_arg'], 'B')
def test_option_with_space(self):
options = PipelineOptions(flags=['--option with space= value with space'])
self.assertEqual(
getattr(
options.view_as(PipelineOptionsTest.MockOptions),
'option with space'),
' value with space')
options_from_dict = PipelineOptions.from_dictionary(
options.get_all_options())
self.assertEqual(
getattr(
options_from_dict.view_as(PipelineOptionsTest.MockOptions),
'option with space'),
' value with space')
def test_retain_unknown_options_binary_store_string(self):
options = PipelineOptions(['--unknown_option', 'some_value'])
result = options.get_all_options(retain_unknown_options=True)
self.assertEqual(result['unknown_option'], 'some_value')
def test_retain_unknown_options_binary_equals_store_string(self):
options = PipelineOptions(['--unknown_option=some_value'])
result = options.get_all_options(retain_unknown_options=True)
self.assertEqual(result['unknown_option'], 'some_value')
def test_retain_unknown_options_binary_multi_equals_store_string(self):
options = PipelineOptions(['--unknown_option=expr = "2 + 2 = 5"'])
result = options.get_all_options(retain_unknown_options=True)
self.assertEqual(result['unknown_option'], 'expr = "2 + 2 = 5"')
def test_retain_unknown_options_binary_single_dash_store_string(self):
options = PipelineOptions(['-i', 'some_value'])
with self.assertRaises(KeyError):
_ = options.get_all_options(retain_unknown_options=True)['i']
def test_retain_unknown_options_unary_store_true(self):
options = PipelineOptions(['--unknown_option'])
result = options.get_all_options(retain_unknown_options=True)
self.assertEqual(result['unknown_option'], True)
def test_retain_unknown_options_consecutive_unary_store_true(self):
options = PipelineOptions(['--option_foo', '--option_bar'])
result = options.get_all_options(retain_unknown_options=True)
self.assertEqual(result['option_foo'], True)
self.assertEqual(result['option_bar'], True)
def test_retain_unknown_options_unary_single_dash_store_true(self):
options = PipelineOptions(['-i'])
result = options.get_all_options(retain_unknown_options=True)
self.assertEqual(result['i'], True)
def test_override_options(self):
base_flags = ['--num_workers', '5']
options = PipelineOptions(base_flags)
self.assertEqual(options.get_all_options()['num_workers'], 5)
self.assertEqual(options.get_all_options()['mock_flag'], False)
options.view_as(PipelineOptionsTest.MockOptions).mock_flag = True
self.assertEqual(options.get_all_options()['num_workers'], 5)
self.assertTrue(options.get_all_options()['mock_flag'])
def test_override_init_options(self):
base_flags = ['--num_workers', '5']
options = PipelineOptions(base_flags, mock_flag=True)
self.assertEqual(options.get_all_options()['num_workers'], 5)
self.assertEqual(options.get_all_options()['mock_flag'], True)
def test_invalid_override_init_options(self):
base_flags = ['--num_workers', '5']
options = PipelineOptions(base_flags, mock_invalid_flag=True)
self.assertEqual(options.get_all_options()['num_workers'], 5)
self.assertEqual(options.get_all_options()['mock_flag'], False)
def test_experiments(self):
options = PipelineOptions(['--experiment', 'abc', '--experiment', 'def'])
self.assertEqual(
sorted(options.get_all_options()['experiments']), ['abc', 'def'])
options = PipelineOptions(['--experiments', 'abc', '--experiments', 'def'])
self.assertEqual(
sorted(options.get_all_options()['experiments']), ['abc', 'def'])
options = PipelineOptions(flags=[''])
self.assertEqual(options.get_all_options()['experiments'], None)
def test_worker_options(self):
options = PipelineOptions([
'--machine_type',
'abc',
'--disk_type',
'def',
'--element_processing_timeout_minutes',
'10',
])
worker_options = options.view_as(WorkerOptions)
self.assertEqual(worker_options.machine_type, 'abc')
self.assertEqual(worker_options.disk_type, 'def')
self.assertEqual(worker_options.element_processing_timeout_minutes, 10)
options = PipelineOptions(
['--worker_machine_type', 'abc', '--worker_disk_type', 'def'])
worker_options = options.view_as(WorkerOptions)
self.assertEqual(worker_options.machine_type, 'abc')
self.assertEqual(worker_options.disk_type, 'def')
def test_beam_services_empty(self):
with mock.patch.dict(os.environ, {}, clear=True):
options = PipelineOptions().view_as(CrossLanguageOptions)
self.assertEqual(options.beam_services, {})
def test_beam_services_from_env(self):
with mock.patch.dict(os.environ,
{'BEAM_SERVICE_OVERRIDES': '{"foo": "bar"}'},
clear=True):
options = PipelineOptions().view_as(CrossLanguageOptions)
self.assertEqual(options.beam_services, {'foo': 'bar'})
def test_beam_services_from_flag(self):
with mock.patch.dict(os.environ, {}, clear=True):
options = PipelineOptions(['--beam_services={"foo": "bar"}'
]).view_as(CrossLanguageOptions)
self.assertEqual(options.beam_services, {'foo': 'bar'})
def test_beam_services_from_env_and_flag(self):
with mock.patch.dict(
os.environ,
{'BEAM_SERVICE_OVERRIDES': '{"foo": "bar", "other": "zzz"}'},
clear=True):
options = PipelineOptions(['--beam_services={"foo": "override"}'
]).view_as(CrossLanguageOptions)
self.assertEqual(
options.beam_services, {
'foo': 'override', 'other': 'zzz'
})
def test_option_modifications_are_shared_between_views(self):
pipeline_options = PipelineOptions([
'--mock_option',
'value',
'--mock_flag',
'--mock_multi_option',
'value1',
'--mock_multi_option',
'value2',
])
mock_options = PipelineOptionsTest.MockOptions([
'--mock_option',
'value',
'--mock_flag',
'--mock_multi_option',
'value1',
'--mock_multi_option',
'value2',
])
for options in [pipeline_options, mock_options]:
view1 = options.view_as(PipelineOptionsTest.MockOptions)
view2 = options.view_as(PipelineOptionsTest.MockOptions)
view1.mock_option = 'new_value'
view1.mock_flag = False
view1.mock_multi_option.append('value3')
view3 = options.view_as(PipelineOptionsTest.MockOptions)
view4 = view1.view_as(PipelineOptionsTest.MockOptions)
view5 = options.view_as(TypeOptions).view_as(
PipelineOptionsTest.MockOptions)
for view in [view1, view2, view3, view4, view5]:
self.assertEqual('new_value', view.mock_option)
self.assertFalse(view.mock_flag)
self.assertEqual(['value1', 'value2', 'value3'], view.mock_multi_option)
def test_uninitialized_option_modifications_are_shared_between_views(self):
options = PipelineOptions([])
view1 = options.view_as(PipelineOptionsTest.MockOptions)
view2 = options.view_as(PipelineOptionsTest.MockOptions)
view1.mock_option = 'some_value'
view1.mock_flag = False
view1.mock_multi_option = ['value1', 'value2']
view3 = options.view_as(PipelineOptionsTest.MockOptions)
view4 = view1.view_as(PipelineOptionsTest.MockOptions)
view5 = options.view_as(TypeOptions).view_as(
PipelineOptionsTest.MockOptions)
for view in [view1, view2, view3, view4, view5]:
self.assertEqual('some_value', view.mock_option)
self.assertFalse(view.mock_flag)
self.assertEqual(['value1', 'value2'], view.mock_multi_option)
def test_extra_package(self):
options = PipelineOptions([
'--extra_package',
'abc',
'--extra_packages',
'def',
'--extra_packages',
'ghi'
])
self.assertEqual(
sorted(options.get_all_options()['extra_packages']),
['abc', 'def', 'ghi'])
options = PipelineOptions(flags=[''])
self.assertEqual(options.get_all_options()['extra_packages'], None)
def test_dataflow_job_file(self):
options = PipelineOptions(['--dataflow_job_file', 'abc'])
self.assertEqual(options.get_all_options()['dataflow_job_file'], 'abc')
options = PipelineOptions(flags=[''])
self.assertEqual(options.get_all_options()['dataflow_job_file'], None)
def test_template_location(self):
options = PipelineOptions(['--template_location', 'abc'])
self.assertEqual(options.get_all_options()['template_location'], 'abc')
options = PipelineOptions(flags=[''])
self.assertEqual(options.get_all_options()['template_location'], None)
def test_redefine_options(self):
class TestRedefinedOptions(PipelineOptions): # pylint: disable=unused-variable
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--redefined_flag', action='store_true')
class TestRedefinedOptions(PipelineOptions): # pylint: disable=function-redefined
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--redefined_flag', action='store_true')
options = PipelineOptions(['--redefined_flag'])
self.assertTrue(options.get_all_options()['redefined_flag'])
# TODO(https://github.com/apache/beam/issues/18197): Require unique names
# only within a test. For now, <file name acronym>_vp_arg<number> will be
# the convention to name value-provider arguments in tests, as opposed to
# <file name acronym>_non_vp_arg<number> for non-value-provider arguments.
# The number will grow per file as tests are added.
def test_value_provider_options(self):
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--pot_vp_arg1', help='This flag is a value provider')
parser.add_value_provider_argument('--pot_vp_arg2', default=1, type=int)
parser.add_argument('--pot_non_vp_arg1', default=1, type=int)
# Provide values: if not provided, the option becomes of the type runtime vp
options = UserOptions(['--pot_vp_arg1', 'hello'])
self.assertIsInstance(options.pot_vp_arg1, StaticValueProvider)
self.assertIsInstance(options.pot_vp_arg2, RuntimeValueProvider)
self.assertIsInstance(options.pot_non_vp_arg1, int)
# Values can be overwritten
options = UserOptions(
pot_vp_arg1=5,
pot_vp_arg2=StaticValueProvider(value_type=str, value='bye'),
pot_non_vp_arg1=RuntimeValueProvider(
option_name='foo', value_type=int, default_value=10))
self.assertEqual(options.pot_vp_arg1, 5)
self.assertTrue(
options.pot_vp_arg2.is_accessible(),
'%s is not accessible' % options.pot_vp_arg2)
self.assertEqual(options.pot_vp_arg2.get(), 'bye')
self.assertFalse(options.pot_non_vp_arg1.is_accessible())
with self.assertRaises(RuntimeError):
options.pot_non_vp_arg1.get()
# Converts extra arguments to list value.
def test_extra_args(self):
options = PipelineOptions([
'--extra_arg',
'val1',
'--extra_arg',
'val2',
'--extra_arg=val3',
'--unknown_arg',
'val4'
])
def add_extra_options(parser):
parser.add_argument("--extra_arg", action='append')
self.assertEqual(
options.get_all_options(
add_extra_args_fn=add_extra_options)['extra_arg'],
['val1', 'val2', 'val3'])
# The argparse package by default tries to autocomplete option names. This
# results in an "ambiguous option" error from argparse when an unknown option
# matching multiple known ones are used. This tests that we suppress this
# error.
def test_unknown_option_prefix(self):
# Test that the "ambiguous option" error is suppressed.
options = PipelineOptions(['--profi', 'val1'])
options.view_as(ProfilingOptions)
# Test that valid errors are not suppressed.
with self.assertRaises(SystemExit):
# Invalid option choice.
options = PipelineOptions(['--type_check_strictness', 'blahblah'])
options.view_as(TypeOptions)
def test_add_experiment(self):
options = PipelineOptions([])
options.view_as(DebugOptions).add_experiment('new_experiment')
self.assertEqual(['new_experiment'],
options.view_as(DebugOptions).experiments)
def test_add_experiment_preserves_existing_experiments(self):
options = PipelineOptions(['--experiment=existing_experiment'])
options.view_as(DebugOptions).add_experiment('new_experiment')
self.assertEqual(['existing_experiment', 'new_experiment'],
options.view_as(DebugOptions).experiments)
def test_lookup_experiments(self):
options = PipelineOptions([
'--experiment=existing_experiment',
'--experiment',
'key=value',
'--experiment',
'master_key=k1=v1,k2=v2',
])
debug_options = options.view_as(DebugOptions)
self.assertEqual(
'default_value',
debug_options.lookup_experiment('nonexistent', 'default_value'))
self.assertEqual(
'value', debug_options.lookup_experiment('key', 'default_value'))
self.assertEqual(
'k1=v1,k2=v2', debug_options.lookup_experiment('master_key'))
self.assertEqual(
True, debug_options.lookup_experiment('existing_experiment'))
def test_transform_name_mapping(self):
options = PipelineOptions(['--transform_name_mapping={\"from\":\"to\"}'])
mapping = options.view_as(GoogleCloudOptions).transform_name_mapping
self.assertEqual(mapping['from'], 'to')
def test_jar_cache_dir(self):
options = PipelineOptions(['--jar_cache_dir=/path/to/jar_cache_dir'])
jar_cache_dir = options.view_as(JobServerOptions).jar_cache_dir
self.assertEqual(jar_cache_dir, '/path/to/jar_cache_dir')
def test_dataflow_service_options(self):
options = PipelineOptions([
'--dataflow_service_option',
'whizz=bang',
'--dataflow_service_option',
'beep=boop'
])
self.assertEqual(
sorted(options.get_all_options()['dataflow_service_options']),
['beep=boop', 'whizz=bang'])
options = PipelineOptions([
'--dataflow_service_options',
'whizz=bang',
'--dataflow_service_options',
'beep=boop'
])
self.assertEqual(
sorted(options.get_all_options()['dataflow_service_options']),
['beep=boop', 'whizz=bang'])
options = PipelineOptions(flags=[''])
self.assertEqual(
options.get_all_options()['dataflow_service_options'], None)
def test_options_store_false_with_different_dest(self):
parser = _BeamArgumentParser()
for cls in PipelineOptions.__subclasses__():
cls._add_argparse_args(parser)
actions = parser._actions.copy()
options_to_flags = {}
options_diff_dest_store_true = {}
for i in range(len(actions)):
flag_names = actions[i].option_strings
option_name = actions[i].dest
if isinstance(actions[i].const, bool):
for flag_name in flag_names:
flag_name = flag_name.strip('-')
if flag_name != option_name:
# Capture flags which has store_action=True and has a
# different dest. This behavior would be confusing.
if actions[i].const:
options_diff_dest_store_true[flag_name] = option_name
continue
# check the flags like no_use_public_ips
# default is None, action is {True, False}
if actions[i].default is None:
options_to_flags[option_name] = flag_name
self.assertEqual(
len(options_diff_dest_store_true),
0,
_LOGGER.error(
"There should be no flags that have a dest "
"different from flag name and action as "
"store_true. It would be confusing "
"to the user. Please specify the dest as the "
"flag_name instead."))
from apache_beam.options.pipeline_options import (
_FLAG_THAT_SETS_FALSE_VALUE)
self.assertDictEqual(
_FLAG_THAT_SETS_FALSE_VALUE,
options_to_flags,
"If you are adding a new boolean flag with default=None,"
" with different dest/option_name from the flag name, please add "
"the dest and the flag name to the map "
"_FLAG_THAT_SETS_FALSE_VALUE in PipelineOptions.py")
def test_gcs_custom_audit_entries(self):
options = PipelineOptions([
'--gcs_custom_audit_entry=user=test-user',
'--gcs_custom_audit_entry=work=test-work',
'--gcs_custom_audit_entries={"job":"test-job", "id":"1234"}'
])
entries = options.view_as(GoogleCloudOptions).gcs_custom_audit_entries
self.assertDictEqual(
entries,
{
'x-goog-custom-audit-user': 'test-user',
'x-goog-custom-audit-work': 'test-work',
'x-goog-custom-audit-job': 'test-job',
'x-goog-custom-audit-id': '1234'
})
def test_gcs_custom_audit_entries_wo_duplicated_prefix(self):
options = PipelineOptions([
'--gcs_custom_audit_entry=x-goog-custom-audit-user=test-user',
'--gcs_custom_audit_entries={"job":"test-job", "id":"1234"}'
])
entries = options.view_as(GoogleCloudOptions).gcs_custom_audit_entries
self.assertDictEqual(
entries,
{
'x-goog-custom-audit-user': 'test-user',
'x-goog-custom-audit-job': 'test-job',
'x-goog-custom-audit-id': '1234'
})
@mock.patch('apache_beam.options.pipeline_options._BeamArgumentParser.error')
def test_gcs_custom_audit_entries_with_errors(self, mock_error):
long_key = 'a' * 65
options = PipelineOptions([f'--gcs_custom_audit_entry={long_key}=1'])
_ = options.view_as(GoogleCloudOptions).gcs_custom_audit_entries
self.assertRegex(
mock_error.call_args[0][0],
'The key .* exceeds the 64-character limit.')
mock_error.reset_mock()
long_value = 'b' * 1201
options = PipelineOptions([f'--gcs_custom_audit_entry=key={long_value}'])
_ = options.view_as(GoogleCloudOptions).gcs_custom_audit_entries
self.assertRegex(
mock_error.call_args[0][0],
'The value .* exceeds the 1200-character limit.')
mock_error.reset_mock()
options = PipelineOptions([
'--gcs_custom_audit_entry=a=1',
'--gcs_custom_audit_entry=b=2',
'--gcs_custom_audit_entry=c=3',
'--gcs_custom_audit_entry=d=4',
'--gcs_custom_audit_entry=job=test-job'
])
_ = options.view_as(GoogleCloudOptions).gcs_custom_audit_entries
self.assertRegex(
mock_error.call_args[0][0],
'The maximum allowed number of GCS custom audit entries .*')
mock_error.reset_mock()
options = PipelineOptions([
'--gcs_custom_audit_entry=a=1',
'--gcs_custom_audit_entry=b=2',
'--gcs_custom_audit_entry=c=3',
'--gcs_custom_audit_entry=d=4'
])
_ = options.view_as(GoogleCloudOptions).gcs_custom_audit_entries
self.assertRegex(
mock_error.call_args[0][0],
'The maximum allowed number of GCS custom audit entries .*')
def _check_errors(self, options, validator, expected):
if has_gcsio:
with mock.patch('apache_beam.io.gcp.gcsio.GcsIO.is_soft_delete_enabled',
return_value=False):
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, expected)
else:
errors = options._handle_temp_and_staging_locations(validator)
self.assertEqual(errors, expected)
def test_validation_good_stg_good_temp(self):
runner = MockRunners.DataflowRunner()
options = GoogleCloudOptions([
'--project=myproject',
'--staging_location=gs://beam/stg',
'--temp_location=gs://beam/tmp'
])
validator = PipelineOptionsValidator(options, runner)
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/stg")
self.assertEqual(
options.get_all_options()['temp_location'], "gs://beam/tmp")
def test_validation_bad_stg_good_temp(self):
runner = MockRunners.DataflowRunner()
options = GoogleCloudOptions([
'--project=myproject',
'--staging_location=badGSpath',
'--temp_location=gs://beam/tmp'
])
validator = PipelineOptionsValidator(options, runner)
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/tmp")
self.assertEqual(
options.get_all_options()['temp_location'], "gs://beam/tmp")
def test_validation_good_stg_bad_temp(self):
runner = MockRunners.DataflowRunner()
options = GoogleCloudOptions([
'--project=myproject',
'--staging_location=gs://beam/stg',
'--temp_location=badGSpath'
])
validator = PipelineOptionsValidator(options, runner)
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/stg")
self.assertEqual(
options.get_all_options()['temp_location'], "gs://beam/stg")
def test_validation_bad_stg_bad_temp_with_default(self):
runner = MockRunners.DataflowRunner()
options = MockGoogleCloudOptionsWithBucket([
'--project=myproject',
'--staging_location=badGSpath',
'--temp_location=badGSpath'
])
validator = PipelineOptionsValidator(options, runner)
self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://default/bucket")
self.assertEqual(
options.get_all_options()['temp_location'], "gs://default/bucket")
def test_validation_bad_stg_bad_temp_no_default(self):
runner = MockRunners.DataflowRunner()
options = MockGoogleCloudOptionsNoBucket([
'--project=myproject',
'--staging_location=badGSpath',
'--temp_location=badGSpath'
])
validator = PipelineOptionsValidator(options, runner)
self._check_errors(
options,
validator,
[
'Invalid GCS path (badGSpath), given for the option: ' \
'temp_location.',
'Invalid GCS path (badGSpath), given for the option: ' \
'staging_location.'
])
def test_comma_separated_experiments(self):
"""Test that comma-separated experiments are parsed correctly."""
# Test single experiment
options = PipelineOptions(['--experiments=abc'])
self.assertEqual(['abc'], options.get_all_options()['experiments'])
# Test comma-separated experiments
options = PipelineOptions(['--experiments=abc,def,ghi'])
self.assertEqual(['abc', 'def', 'ghi'],
options.get_all_options()['experiments'])
# Test multiple flags with comma-separated values
options = PipelineOptions(
['--experiments=abc,def', '--experiments=ghi,jkl'])
self.assertEqual(['abc', 'def', 'ghi', 'jkl'],
options.get_all_options()['experiments'])
# Test with spaces around commas
options = PipelineOptions(['--experiments=abc, def , ghi'])
self.assertEqual(['abc', 'def', 'ghi'],
options.get_all_options()['experiments'])
# Test empty values are filtered out
options = PipelineOptions(['--experiments=abc,,def,'])
self.assertEqual(['abc', 'def'], options.get_all_options()['experiments'])
def test_comma_separated_dataflow_service_options(self):
"""Test that comma-separated dataflow service options are parsed
correctly."""
# Test single option
options = PipelineOptions(['--dataflow_service_options=option1=value1'])
self.assertEqual(['option1=value1'],
options.get_all_options()['dataflow_service_options'])
# Test comma-separated options
options = PipelineOptions([
'--dataflow_service_options=option1=value1,option2=value2,'
'option3=value3'
])
self.assertEqual(['option1=value1', 'option2=value2', 'option3=value3'],
options.get_all_options()['dataflow_service_options'])
# Test multiple flags with comma-separated values
options = PipelineOptions([
'--dataflow_service_options=option1=value1,option2=value2',
'--dataflow_service_options=option3=value3,option4=value4'
])
self.assertEqual([
'option1=value1', 'option2=value2', 'option3=value3', 'option4=value4'
],
options.get_all_options()['dataflow_service_options'])
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()