| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import os |
| from unittest import TestCase, mock |
| |
| from liminal.core.config.config import ConfigUtil |
| |
| |
| # noinspection PyUnresolvedReferences,DuplicatedCode |
| class TestConfigUtil(TestCase): |
| |
| @mock.patch('liminal.core.util.files_util.load') |
| def test_safe_load(self, find_config_files_mock): |
| subliminal = { |
| 'name': 'my_subliminal_test', |
| 'type': 'sub', |
| 'super': 'my_superliminal_test', |
| 'images': [{ |
| 'image': 'my_image' |
| }], |
| 'pipelines': [ |
| {'name': 'mypipe1', 'param': 'constant'}, |
| {'name': 'mypipe2', 'param': 'constant'} |
| ], |
| 'pipeline_defaults': { |
| 'param1': 'param1_value' |
| }, |
| 'task_defaults': { |
| 'job_start': { |
| 'task_sub_def': 'task_sub_def_value' |
| } |
| } |
| } |
| superliminal = { |
| 'name': 'my_superliminal_test', |
| 'type': 'super', |
| 'super': 'super_superliminal', |
| 'images': [{ |
| 'image': 'my_image', |
| 'source': '.' |
| }], |
| 'pipeline_defaults': { |
| 'param2': 'param2super_value', |
| 'param3': 'param3super_value' |
| }, |
| 'task_defaults': { |
| 'job_start': { |
| 'task_def1': 'task_def1_value', |
| 'task_def2': { |
| 'task_def2_1': 'task_def2_1_value', |
| } |
| } |
| } |
| } |
| super_superliminal = { |
| 'name': 'super_superliminal', |
| 'type': 'super', |
| 'pipeline_defaults': { |
| 'param2': 'param2super_value', |
| 'param3': 'param3hyper_value', |
| 'param4': 'param4hyper_value' |
| } |
| } |
| |
| expected = [{ |
| 'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'}], |
| 'name': 'my_subliminal_test', |
| 'pipeline_defaults': {'param1': 'param1_value'}, |
| 'pipelines': [{'description': 'add defaults parameters for all pipelines', |
| 'name': 'mypipe1', |
| 'param': 'constant', |
| 'param1': 'param1_value', |
| 'param2': 'param2super_value', |
| 'param3': 'param3super_value', |
| 'param4': 'param4hyper_value', |
| 'tasks': [{'task': 'start', |
| 'task_def1': 'task_def1_value', |
| 'task_def2': {'task_def2_1': 'task_def2_1_value'}, |
| 'task_sub_def': 'task_sub_def_value', |
| 'type': 'job_start'}, |
| {'task': 'end', 'type': 'job_end'}]}, |
| {'description': 'add defaults parameters for all pipelines', |
| 'name': 'mypipe2', |
| 'param': 'constant', |
| 'param1': 'param1_value', |
| 'param2': 'param2super_value', |
| 'param3': 'param3super_value', |
| 'param4': 'param4hyper_value', |
| 'tasks': [{'task': 'start', |
| 'task_def1': 'task_def1_value', |
| 'task_def2': {'task_def2_1': 'task_def2_1_value'}, |
| 'task_sub_def': 'task_sub_def_value', |
| 'type': 'job_start'}, |
| {'task': 'end', 'type': 'job_end'}]}], |
| 'service_defaults': {'description': 'add defaults parameters for all ' |
| 'services'}, |
| 'images': [{'image': 'my_image', 'source': '.'}], |
| 'services': [], |
| 'super': 'my_superliminal_test', |
| 'task_defaults': {'job_start': {'task_sub_def': 'task_sub_def_value'}}, |
| 'type': 'sub' |
| }] |
| |
| find_config_files_mock.return_value = { |
| 'my_subliminal_test': subliminal, |
| 'my_superliminal_test': superliminal, |
| 'super_superliminal': super_superliminal |
| } |
| |
| config_util = ConfigUtil('') |
| |
| self.assertEqual(expected, config_util.safe_load(is_render_variables=True)) |
| |
| # validate cache |
| self.assertEqual(expected, config_util.loaded_subliminals) |
| |
| @mock.patch('liminal.core.util.files_util.load') |
| def test_get_config(self, find_config_files_mock): |
| find_config_files_mock.return_value = { |
| 'my_subliminal_test': { |
| 'type': 'sub' |
| }, |
| 'my_superliminal_test': { |
| 'type': 'super' |
| } |
| } |
| |
| config_util = ConfigUtil('') |
| |
| self.assertEqual({'type': 'sub'}, |
| config_util._ConfigUtil__get_config('my_subliminal_test')) |
| |
| self.assertEqual({'type': 'super'}, |
| config_util._ConfigUtil__get_config('my_superliminal_test')) |
| |
| @mock.patch('liminal.core.util.files_util.load') |
| def test_get_superliminal(self, find_config_files_mock): |
| base = {'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'}], |
| 'name': 'base', |
| 'pipeline_defaults': {'after_tasks': [{'task': 'end', 'type': 'job_end'}], |
| 'before_tasks': [{'task': 'start', 'type': 'job_start'}], |
| 'description': 'add defaults parameters for all ' |
| 'pipelines'}, |
| 'service_defaults': {'description': 'add defaults parameters for all ' |
| 'services'}, |
| 'task_defaults': {'description': 'add defaults parameters for all tasks ' |
| 'separate by task type', |
| 'python': {'executor': 'default_k8s'}}, |
| 'type': 'super'} |
| subliminal = { |
| 'name': 'subliminal_test', |
| 'type': 'sub' |
| } |
| |
| find_config_files_mock.return_value = { |
| 'subliminal_test': subliminal |
| } |
| |
| config_util = ConfigUtil('') |
| |
| self.assertEqual(base, |
| config_util._ConfigUtil__get_superliminal(subliminal, False)) |
| |
| self.assertEqual({}, |
| config_util._ConfigUtil__get_superliminal(base, False)) |
| |
| liminal = { |
| 'name': 'subliminal_test', |
| 'type': 'sub', |
| 'super': 'my_superliminal' |
| } |
| |
| with self.assertRaises(FileNotFoundError): |
| config_util._ConfigUtil__get_superliminal(liminal, False) |
| |
| @mock.patch('liminal.core.util.files_util.load') |
| def test_merge_superliminals(self, find_config_files_mock): |
| superliminal = { |
| 'name': 'my_superliminal_test', |
| 'type': 'super', |
| 'super': 'super_superliminal', |
| 'pipeline_defaults': { |
| 'before_tasks': [ |
| {'task': 'start-2', 'type': 'spark'}, |
| ], |
| 'after_tasks': [ |
| {'task': 'end-1', 'type': 'spark'} |
| ] |
| }, |
| 'task_defaults': { |
| 'task_def1': 'task_def1_value' |
| } |
| } |
| |
| super_superliminal = { |
| 'name': 'super_superliminal', |
| 'type': 'super', |
| 'pipeline_defaults': { |
| 'before_tasks': [ |
| {'task': 'start-1', 'type': 'spark'}], |
| 'after_tasks': [ |
| {'task': 'end-2', 'type': 'spark'} |
| ] |
| } |
| } |
| |
| config_util = ConfigUtil('') |
| |
| find_config_files_mock.return_value = { |
| 'super_superliminal': super_superliminal, |
| 'superliminal': superliminal |
| } |
| |
| expected = { |
| 'name': 'my_superliminal_test', |
| 'pipeline_defaults': {'after_tasks': [{'task': 'end-1', 'type': 'spark'}, |
| {'task': 'end-2', 'type': 'spark'}], |
| 'before_tasks': [{'task': 'start-1', 'type': 'spark'}, |
| {'task': 'start-2', 'type': 'spark'}]}, |
| 'super': 'super_superliminal', |
| 'task_defaults': {'task_def1': 'task_def1_value'}, |
| 'type': 'super' |
| } |
| |
| self.assertEqual(expected, |
| dict(config_util._ConfigUtil__merge_superliminals(superliminal, |
| super_superliminal))) |
| |
| @mock.patch('liminal.core.util.files_util.load') |
| @mock.patch.dict(os.environ, {'env': 'myenv', 'LIMINAL_STAND_ALONE_MODE': 'True'}) |
| def test_safe_load_with_variables(self, find_config_files_mock): |
| subliminal = { |
| 'name': 'my_subliminal_test', |
| 'type': 'sub', |
| 'super': 'my_superliminal_test', |
| 'variables': { |
| 'var': 'simple case', |
| 'var-2': '-case', |
| 'var_2': '_case', |
| 'image': 'prod image', |
| 'a': '{{env}}1', |
| 'b': '{{a}}2', |
| 'c': '{{a}}{{b}}2' |
| }, |
| 'pipelines': [ |
| {'name': 'mypipe1', 'param': '{{var}}', |
| 'tasks': [ |
| {'task': 'sub_tasks', |
| 'type': 'dummy'}, |
| ]}, |
| {'name': 'mypipe2', 'param': '{{var-2 }}', 'tasks': [ |
| {'task': 'sub_tasks', |
| 'type': 'dummy'}, |
| ]} |
| ], |
| 'pipeline_defaults': { |
| 'param1': '{{var-2}}' |
| }, |
| 'task_defaults': { |
| 'job_start': { |
| 'task_def1:': 'task_sub_def_value' |
| } |
| |
| }, |
| 'services': [ |
| { |
| 'name': 'my_python_server', |
| 'type': 'python_server', |
| 'image': '{{image}}' |
| }, |
| { |
| 'name': 'my_python_server_for_stg', |
| 'type': 'python_server', |
| 'image': '{{default_image}}' |
| } |
| ]} |
| |
| superliminal = { |
| 'name': 'my_superliminal_test', |
| 'type': 'super', |
| 'variables': { |
| 'var-2': 'override', |
| 'var3': 'super_var', |
| 'default_image': 'default_image_value', |
| 'image': 'default_image_value' |
| }, |
| 'super': 'super_superliminal', |
| 'pipeline_defaults': { |
| 'param2': '{{pipe-var}}', |
| 'param3': 'param3super_value', |
| 'before_tasks': [ |
| {'task': 'second_task', 'type': 'dummy'}, |
| ] |
| }, |
| 'task_defaults': { |
| 'pipeline': { |
| 'path': '{{var-2}}', |
| 'task_def1': 'task_def1_value', |
| 'task_def2': { |
| 'task_def2_1': 'task_def2_1_value', |
| } |
| } |
| } |
| } |
| super_superliminal = { |
| 'name': 'super_superliminal', |
| 'type': 'super', |
| 'variables': { |
| 'default_image': 'def_default_image_value' |
| }, |
| 'pipeline_defaults': { |
| 'global_conf': '{{var3}}', |
| 'param2': 'param2super_value', |
| 'param3': 'param3hyper_value', |
| 'param4': 'param4hyper_value', |
| 'after_tasks': [ |
| {'task': 'before_last_task', 'type': 'dummy'}, |
| ] |
| } |
| } |
| |
| expected = [{ |
| 'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'}], |
| 'name': 'my_subliminal_test', |
| 'pipeline_defaults': {'param1': '-case'}, |
| 'pipelines': [{'description': 'add defaults parameters for all pipelines', |
| 'global_conf': 'super_var', |
| 'name': 'mypipe1', |
| 'param': 'simple case', |
| 'param1': '-case', |
| 'param2': '{{pipe-var}}', |
| 'param3': 'param3super_value', |
| 'param4': 'param4hyper_value', |
| 'tasks': [{'task': 'start', |
| 'task_def1:': 'task_sub_def_value', |
| 'type': 'job_start'}, |
| {'task': 'second_task', 'type': 'dummy'}, |
| {'task': 'sub_tasks', 'type': 'dummy'}, |
| {'task': 'before_last_task', 'type': 'dummy'}, |
| {'task': 'end', 'type': 'job_end'}]}, |
| {'description': 'add defaults parameters for all pipelines', |
| 'global_conf': 'super_var', |
| 'name': 'mypipe2', |
| 'param': '-case', |
| 'param1': '-case', |
| 'param2': '{{pipe-var}}', |
| 'param3': 'param3super_value', |
| 'param4': 'param4hyper_value', |
| 'tasks': [{'task': 'start', |
| 'task_def1:': 'task_sub_def_value', |
| 'type': 'job_start'}, |
| {'task': 'second_task', 'type': 'dummy'}, |
| {'task': 'sub_tasks', 'type': 'dummy'}, |
| {'task': 'before_last_task', 'type': 'dummy'}, |
| {'task': 'end', 'type': 'job_end'}]}], |
| 'service_defaults': {'description': 'add defaults parameters for all ' |
| 'services'}, |
| 'images': [], |
| 'services': [{'description': 'add defaults parameters for all services', |
| 'image': 'prod image', |
| 'name': 'my_python_server', |
| 'type': 'python_server'}, |
| {'description': 'add defaults parameters for all services', |
| 'image': 'default_image_value', |
| 'name': 'my_python_server_for_stg', |
| 'type': 'python_server'}], |
| 'super': 'my_superliminal_test', |
| 'task_defaults': {'job_start': {'task_def1:': 'task_sub_def_value'}}, |
| 'type': 'sub', |
| 'variables': {'a': 'myenv1', |
| 'b': 'myenv12', |
| 'c': 'myenv1myenv122', |
| 'image': 'prod image', |
| 'var': 'simple case', |
| 'var-2': '-case', |
| 'var_2': '_case'} |
| }] |
| |
| find_config_files_mock.return_value = { |
| 'my_subliminal_test': subliminal, |
| 'my_superliminal_test': superliminal, |
| 'super_superliminal': super_superliminal |
| } |
| |
| config_util = ConfigUtil('') |
| |
| self.assertEqual(expected, config_util.safe_load(is_render_variables=True)) |
| |
| # validate cache |
| self.assertEqual(expected, config_util.loaded_subliminals) |
| |
| @mock.patch('os.path.exists') |
| @mock.patch('liminal.core.environment.get_airflow_home_dir') |
| @mock.patch('liminal.core.util.files_util.load') |
| @mock.patch.dict(os.environ, {'LIMINAL_STAND_ALONE_MODE': 'True', 'POD_NAMESPACE': 'my_pod_ns'}) |
| def test_liminal_config_snapshot(self, find_config_files_mock, |
| get_airflow_dir_mock, path_exists_mock): |
| subliminal = { |
| 'name': 'my_subliminal_test', |
| 'type': 'sub', |
| 'variables': { |
| 'var': 1, |
| 'var-2': True |
| }, |
| 'pipelines': [ |
| {'name': 'mypipe1', 'param': '{{var}}'}, |
| {'name': 'mypipe2', 'param': '{{var-2 }}'} |
| ] |
| } |
| |
| expected = { |
| 'name': 'my_subliminal_test', 'type': 'sub', |
| 'executors': [{'executor': 'default_k8s', 'type': 'kubernetes'}], |
| 'service_defaults': {'description': 'add defaults parameters for all services'}, |
| 'task_defaults': { |
| 'description': 'add defaults parameters for all tasks separate by task type', |
| 'python': {'executor': 'default_k8s'}}, 'pipeline_defaults': { |
| 'description': 'add defaults parameters for all pipelines', |
| 'before_tasks': [{'task': 'start', 'type': 'job_start'}], |
| 'after_tasks': [{'task': 'end', 'type': 'job_end'}]}, |
| 'variables': {'var': 1, 'var-2': True}, 'pipelines': [ |
| {'name': 'mypipe1', 'param': '1', |
| 'description': 'add defaults parameters for all pipelines', |
| 'tasks': [{'task': 'start', 'type': 'job_start'}, |
| {'task': 'end', 'type': 'job_end'}]}, |
| {'name': 'mypipe2', 'param': 'True', |
| 'description': 'add defaults parameters for all pipelines', |
| 'tasks': [{'task': 'start', 'type': 'job_start'}, |
| {'task': 'end', 'type': 'job_end'}]}], |
| 'images': [], |
| 'services': [] |
| } |
| |
| find_config_files_mock.return_value = { |
| 'my_subliminal_test': subliminal |
| } |
| |
| get_airflow_dir_mock.return_value = '/tmp' |
| path_exists_mock.return_value = True |
| |
| with mock.patch('builtins.open', mock.mock_open()) as m: |
| with mock.patch('yaml.dump') as ydm: |
| config_util = ConfigUtil('') |
| config_util.safe_load(is_render_variables=True) |
| config_util.snapshot_final_liminal_configs() |
| |
| m.assert_called_once_with( |
| os.path.join('/tmp', '../liminal_config_files/my_subliminal_test.yml'), 'w') |
| ydm.assert_called_once_with(expected, m.return_value, default_flow_style=False) |
| |
| @mock.patch('liminal.core.util.files_util.load') |
| def test_soft_merge_load(self, find_config_files_mock): |
| subliminal = { |
| 'name': 'my_name', |
| 'type': 'sub', |
| 'super': 'my_super' |
| } |
| find_config_files_mock.return_value = {'my_subliminal_test': subliminal} |
| |
| config_util = ConfigUtil('') |
| |
| self.assertEqual([subliminal], |
| config_util.safe_load(is_render_variables=True, soft_merge=True)) |
| |
| def test_non_soft_merge_load(self): |
| subliminal = { |
| 'name': 'my_name', |
| 'type': 'sub', |
| 'super': 'my_super' |
| } |
| |
| config_util = ConfigUtil('') |
| |
| self.assertRaises(FileNotFoundError, |
| config_util._ConfigUtil__get_superliminal, |
| subliminal, |
| False) |