| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import logging |
| import os |
| import traceback |
| |
| from liminal.core import environment |
| from liminal.core.config.defaults import base, default_configs |
| from liminal.core.util import dict_util, files_util |
| |
| |
| class ConfigUtil: |
| """ |
| Load and enrich config files under configs_path. |
| """ |
| |
| __BASE = 'base' |
| __PIPELINES = 'pipelines' |
| __SUPER = 'super' |
| __TYPE = 'type' |
| __SUB = 'sub' |
| __SERVICES = 'services' |
| __TASKS = 'tasks' |
| __PIPELINE_DEFAULTS = 'pipeline_defaults' |
| __TASK_DEFAULTS = 'task_defaults' |
| __BEFORE_TASKS = 'before_tasks' |
| __AFTER_TASKS = 'after_tasks' |
| __EXECUTORS = 'executors' |
| __IMAGES = 'images' |
| __BASE = "base" |
| __PIPELINES = "pipelines" |
| __SUPER = "super" |
| __TYPE = "type" |
| __SUB = "sub" |
| __SERVICES = "services" |
| __TASKS = "tasks" |
| __PIPELINE_DEFAULTS = "pipeline_defaults" |
| __TASK_DEFAULTS = "task_defaults" |
| __BEFORE_TASKS = "before_tasks" |
| __AFTER_TASKS = "after_tasks" |
| __EXECUTORS = "executors" |
| |
| def __init__(self, configs_path): |
| self.configs_path = configs_path |
| self.config_files = files_util.load(configs_path) |
| self.base = base.BASE |
| self.loaded_subliminals = [] |
| self.snapshot_path = os.path.join(environment.get_airflow_home_dir(), '../liminal_config_files') |
| |
| def safe_load(self, is_render_variables, soft_merge=False): |
| """ |
| :returns list of config files after enrich with defaults and supers |
| """ |
| if self.loaded_subliminals: |
| return self.loaded_subliminals |
| |
| configs = self.config_files.values() |
| # print(f"configs; {configs}") |
| enriched_configs = [] |
| |
| for subliminal in [config for config in configs if self.__is_subliminal(config)]: |
| name = subliminal.get('name') |
| # logging.info(f'11111111111111111111111111111111') |
| logging.info(f'Loading yml {name}') |
| # noinspection PyBroadException |
| try: |
| superliminal = self.__get_superliminal(subliminal, soft_merge) |
| # print(f"superliminal: {superliminal}") |
| enriched_config = self.__merge_configs(subliminal, superliminal, is_render_variables, soft_merge) |
| # print(f"enriched_config: {enriched_config}") |
| enriched_configs.append(enriched_config) |
| except Exception: |
| logging.error(f'Failed to load yml {name}') |
| traceback.print_exc() |
| |
| self.loaded_subliminals = enriched_configs |
| |
| return self.loaded_subliminals |
| |
| def __merge_configs(self, subliminal, superliminal, is_render_variables, soft_merge): |
| if not superliminal: |
| return subliminal |
| |
| sub = subliminal.copy() |
| supr = superliminal.copy() |
| |
| merged_superliminal = self.__merge_configs( |
| supr, self.__get_superliminal(supr, soft_merge), is_render_variables, soft_merge |
| ) |
| # print(f"merged_superliminal: {merged_superliminal}") |
| |
| sub[self.__EXECUTORS] = self.__merge_section(sub, merged_superliminal, self.__EXECUTORS) |
| sub[self.__IMAGES] = self.__merge_section(sub, merged_superliminal, self.__IMAGES) |
| |
| if self.__is_subliminal(sub): |
| return self.__merge_sub_and_super(sub, merged_superliminal, is_render_variables) |
| else: |
| return self.__merge_superliminals(sub, merged_superliminal) |
| |
| def __get_superliminal(self, liminal, soft_merge): |
| # print(f"liminal: {liminal}") |
| # print(f"soft_merge: {soft_merge}") |
| superliminal = {} |
| if not self.__is_base_config(liminal): |
| superliminal_name = liminal.get(self.__SUPER, '') |
| if not superliminal_name: |
| superliminal = self.base |
| else: |
| superliminal = self.__get_config(superliminal_name) |
| if not superliminal: |
| supr_is_missing_msg = ( |
| f"superliminal '{superliminal_name}' " + f"is missing from '{self.configs_path}'" |
| ) |
| if soft_merge: |
| logging.warning(supr_is_missing_msg) |
| else: |
| raise FileNotFoundError(supr_is_missing_msg) |
| # print(f"superliminal: {superliminal}") |
| return superliminal |
| |
| def __get_base_config(self): |
| return self.base |
| |
| def __is_base_config(self, config): |
| return config.get('name', '') == self.__BASE |
| |
| def __is_subliminal(self, config): |
| is_subliminal = config.get(self.__TYPE, self.__SUB) != self.__SUPER |
| if is_subliminal: |
| config[self.__TYPE] = self.__SUB |
| return is_subliminal |
| |
| def __get_config(self, config_name): |
| return self.config_files.get(config_name) |
| |
| def __merge_sub_and_super(self, sub, supr, is_render_variables): |
| merged_pipelines = list() |
| |
| for pipeline in sub.get(self.__PIPELINES, {}): |
| final_pipeline = self.__apply_pipeline_defaults(sub, supr, pipeline) |
| merged_pipelines.append(final_pipeline) |
| |
| sub[self.__PIPELINES] = merged_pipelines |
| sub[self.__SERVICES] = default_configs.apply_service_defaults(sub, supr) |
| |
| sub = dict_util.merge_dicts(supr.copy(), sub) |
| |
| return default_configs.apply_variable_substitution(sub, supr, is_render_variables) |
| |
| def __merge_superliminals(self, super1, super2): |
| super1_pipeline_defaults = super1.get(self.__PIPELINE_DEFAULTS, {}).copy() |
| super2_pipeline_defaults = super2.get(self.__PIPELINE_DEFAULTS, {}).copy() |
| |
| super1[self.__PIPELINE_DEFAULTS] = super1_pipeline_defaults |
| super1[self.__PIPELINE_DEFAULTS][self.__BEFORE_TASKS] = super2_pipeline_defaults.pop( |
| self.__BEFORE_TASKS, [] |
| ) + super1_pipeline_defaults.pop(self.__BEFORE_TASKS, []) |
| |
| super2[self.__PIPELINE_DEFAULTS] = super2_pipeline_defaults |
| super1[self.__PIPELINE_DEFAULTS][self.__AFTER_TASKS] = super1_pipeline_defaults.pop( |
| self.__AFTER_TASKS, [] |
| ) + super2_pipeline_defaults.pop(self.__AFTER_TASKS, []) |
| |
| # merge supers tasks |
| return dict_util.merge_dicts(super1, super2, True) |
| |
| def snapshot_final_liminal_configs(self): |
| files_util.dump_liminal_configs(liminal_configs=self.loaded_subliminals, path=self.snapshot_path) |
| |
| def __merge_section(self, subliminal, superliminal, section): |
| return self.__deep_list_keyword_merge(section[:-1], subliminal.get(section, []), superliminal.get(section, [])) |
| |
| @staticmethod |
| def __apply_pipeline_defaults(subliminal, superliminal, pipeline): |
| return default_configs.apply_pipeline_defaults(subliminal, superliminal, pipeline) |
| |
| @staticmethod |
| def __deep_list_keyword_merge(unique_key_name, subliminal_list_conf, superliminal_list_conf): |
| subliminal_key_map = {item[unique_key_name]: item for item in subliminal_list_conf} |
| superliminal_key_map = {item[unique_key_name]: item for item in superliminal_list_conf} |
| |
| return list(dict_util.merge_dicts(superliminal_key_map, subliminal_key_map, recursive=True).values()) |