blob: 04f26e74491a9fb28dcd598607561f388209e298 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import os
import traceback
from liminal.core import environment
from liminal.core.config.defaults import base, default_configs
from liminal.core.util import dict_util, files_util
class ConfigUtil:
"""
Load and enrich config files under configs_path.
"""
__BASE = 'base'
__PIPELINES = 'pipelines'
__SUPER = 'super'
__TYPE = 'type'
__SUB = 'sub'
__SERVICES = 'services'
__TASKS = 'tasks'
__PIPELINE_DEFAULTS = 'pipeline_defaults'
__TASK_DEFAULTS = 'task_defaults'
__BEFORE_TASKS = 'before_tasks'
__AFTER_TASKS = 'after_tasks'
__EXECUTORS = 'executors'
__IMAGES = 'images'
__BASE = "base"
__PIPELINES = "pipelines"
__SUPER = "super"
__TYPE = "type"
__SUB = "sub"
__SERVICES = "services"
__TASKS = "tasks"
__PIPELINE_DEFAULTS = "pipeline_defaults"
__TASK_DEFAULTS = "task_defaults"
__BEFORE_TASKS = "before_tasks"
__AFTER_TASKS = "after_tasks"
__EXECUTORS = "executors"
def __init__(self, configs_path):
self.configs_path = configs_path
self.config_files = files_util.load(configs_path)
self.base = base.BASE
self.loaded_subliminals = []
self.snapshot_path = os.path.join(environment.get_airflow_home_dir(), '../liminal_config_files')
def safe_load(self, is_render_variables, soft_merge=False):
"""
:returns list of config files after enrich with defaults and supers
"""
if self.loaded_subliminals:
return self.loaded_subliminals
configs = self.config_files.values()
# print(f"configs; {configs}")
enriched_configs = []
for subliminal in [config for config in configs if self.__is_subliminal(config)]:
name = subliminal.get('name')
# logging.info(f'11111111111111111111111111111111')
logging.info(f'Loading yml {name}')
# noinspection PyBroadException
try:
superliminal = self.__get_superliminal(subliminal, soft_merge)
# print(f"superliminal: {superliminal}")
enriched_config = self.__merge_configs(subliminal, superliminal, is_render_variables, soft_merge)
# print(f"enriched_config: {enriched_config}")
enriched_configs.append(enriched_config)
except Exception:
logging.error(f'Failed to load yml {name}')
traceback.print_exc()
self.loaded_subliminals = enriched_configs
return self.loaded_subliminals
def __merge_configs(self, subliminal, superliminal, is_render_variables, soft_merge):
if not superliminal:
return subliminal
sub = subliminal.copy()
supr = superliminal.copy()
merged_superliminal = self.__merge_configs(
supr, self.__get_superliminal(supr, soft_merge), is_render_variables, soft_merge
)
# print(f"merged_superliminal: {merged_superliminal}")
sub[self.__EXECUTORS] = self.__merge_section(sub, merged_superliminal, self.__EXECUTORS)
sub[self.__IMAGES] = self.__merge_section(sub, merged_superliminal, self.__IMAGES)
if self.__is_subliminal(sub):
return self.__merge_sub_and_super(sub, merged_superliminal, is_render_variables)
else:
return self.__merge_superliminals(sub, merged_superliminal)
def __get_superliminal(self, liminal, soft_merge):
# print(f"liminal: {liminal}")
# print(f"soft_merge: {soft_merge}")
superliminal = {}
if not self.__is_base_config(liminal):
superliminal_name = liminal.get(self.__SUPER, '')
if not superliminal_name:
superliminal = self.base
else:
superliminal = self.__get_config(superliminal_name)
if not superliminal:
supr_is_missing_msg = (
f"superliminal '{superliminal_name}' " + f"is missing from '{self.configs_path}'"
)
if soft_merge:
logging.warning(supr_is_missing_msg)
else:
raise FileNotFoundError(supr_is_missing_msg)
# print(f"superliminal: {superliminal}")
return superliminal
def __get_base_config(self):
return self.base
def __is_base_config(self, config):
return config.get('name', '') == self.__BASE
def __is_subliminal(self, config):
is_subliminal = config.get(self.__TYPE, self.__SUB) != self.__SUPER
if is_subliminal:
config[self.__TYPE] = self.__SUB
return is_subliminal
def __get_config(self, config_name):
return self.config_files.get(config_name)
def __merge_sub_and_super(self, sub, supr, is_render_variables):
merged_pipelines = list()
for pipeline in sub.get(self.__PIPELINES, {}):
final_pipeline = self.__apply_pipeline_defaults(sub, supr, pipeline)
merged_pipelines.append(final_pipeline)
sub[self.__PIPELINES] = merged_pipelines
sub[self.__SERVICES] = default_configs.apply_service_defaults(sub, supr)
sub = dict_util.merge_dicts(supr.copy(), sub)
return default_configs.apply_variable_substitution(sub, supr, is_render_variables)
def __merge_superliminals(self, super1, super2):
super1_pipeline_defaults = super1.get(self.__PIPELINE_DEFAULTS, {}).copy()
super2_pipeline_defaults = super2.get(self.__PIPELINE_DEFAULTS, {}).copy()
super1[self.__PIPELINE_DEFAULTS] = super1_pipeline_defaults
super1[self.__PIPELINE_DEFAULTS][self.__BEFORE_TASKS] = super2_pipeline_defaults.pop(
self.__BEFORE_TASKS, []
) + super1_pipeline_defaults.pop(self.__BEFORE_TASKS, [])
super2[self.__PIPELINE_DEFAULTS] = super2_pipeline_defaults
super1[self.__PIPELINE_DEFAULTS][self.__AFTER_TASKS] = super1_pipeline_defaults.pop(
self.__AFTER_TASKS, []
) + super2_pipeline_defaults.pop(self.__AFTER_TASKS, [])
# merge supers tasks
return dict_util.merge_dicts(super1, super2, True)
def snapshot_final_liminal_configs(self):
files_util.dump_liminal_configs(liminal_configs=self.loaded_subliminals, path=self.snapshot_path)
def __merge_section(self, subliminal, superliminal, section):
return self.__deep_list_keyword_merge(section[:-1], subliminal.get(section, []), superliminal.get(section, []))
@staticmethod
def __apply_pipeline_defaults(subliminal, superliminal, pipeline):
return default_configs.apply_pipeline_defaults(subliminal, superliminal, pipeline)
@staticmethod
def __deep_list_keyword_merge(unique_key_name, subliminal_list_conf, superliminal_list_conf):
subliminal_key_map = {item[unique_key_name]: item for item in subliminal_list_conf}
superliminal_key_map = {item[unique_key_name]: item for item in superliminal_list_conf}
return list(dict_util.merge_dicts(superliminal_key_map, subliminal_key_map, recursive=True).values())