[LIMINAL-57] add config utils

diff --git a/docs/liminal/advanced.liminal.yml.md b/docs/liminal/advanced.liminal.yml.md
index 77442dc..95f3ed3 100644
--- a/docs/liminal/advanced.liminal.yml.md
+++ b/docs/liminal/advanced.liminal.yml.md
@@ -23,8 +23,8 @@
 
 ## Variables
 
-Much like in programming languages, you can define variables for re-use across your
-liminal.yml file.
+Much like in programming languages, you can define variables for re-use across your liminal.yml
+file.
 
 ```yaml
 variables:
@@ -40,6 +40,7 @@
 Make sure that any string that includes placeholders is surrounded by single or double quotes.
 
 For example:
+
 ```yaml
 variables:
   image_name: myorg/myrepo:myapp
@@ -66,13 +67,15 @@
 following order:
 
 1. Current [DAG run conf](https://airflow.apache.org/docs/apache-airflow/stable/dag-run.html)
-(Airflow).
+   (Airflow).
 2. liminal.yml [variables](#variables) section.
 3. [Airflow variables](https://airflow.apache.org/docs/apache-airflow/stable/howto/variable.html)
-(Airflow).
+   (Airflow).
 4. [Airflow macros](https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html) (Airflow)
-. For example: `"{{yesterday_ds}}"`. For more information see:
-5. Airflow [Jinja Templating](https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#jinja-templating)
+   . For example: `"{{yesterday_ds}}"`. For more information see:
+5.
+
+Airflow [Jinja Templating](https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#jinja-templating)
 (Airflow).
 
 #### Build placeholder rendering
@@ -86,7 +89,8 @@
 ## Task variables
 
 In addition to the variables section you can also set specific variables for specific `task`s using
-the `variables` attribute of that task. For example:
+the `variables`
+attribute of that task. For example:
 
 ```yaml
   - task: my_task
@@ -146,7 +150,7 @@
 In the example above we define an `executor` of type `kubernetes` with custom resources
 configuration.
 
-`executors` is a section in the root of your liminal.yml file and is a list of `executor`s defined 
+`executors` is a section in the root of your liminal.yml file and is a list of `executor`s defined
 by the following attributes:
 
 ### executor attributes
@@ -160,7 +164,8 @@
 ## Task Defaults
 
 `task_defaults` is a section in the root of your liminal.yml file in which default attributes can be
-set for each `task` type.
+set for each `task`
+type.
 
 ```yaml
 task_defaults:
@@ -178,7 +183,7 @@
 If the same attribute is defined in both `task_defaults` and in the `task`, definitions from the
 `task` take precedence. If a map (dictionary) of values (for example `env_vars`) is defined in both
 `task_defaults` the two maps will be merged, with definitions in the `task` taking precedence in
-case key is defined in both maps. 
+case key is defined in both maps.
 
 ## Pipeline Defaults
 
@@ -195,26 +200,25 @@
 Each `pipeline` that does not set these attributes will default to the setting in
 `pipeline_defaults`.
 
-If the same attribute is defined in both `pipeline_defaults` and in the `pipeline`, definitions
-from the `pipeline` take precedence.
+If the same attribute is defined in both `pipeline_defaults` and in the `pipeline`, definitions from
+the `pipeline` take precedence.
 
 If `tasks` section is defined in `pipeline_defaults` each pipeline defined in our liminal.yml file
 will have the tasks defined in `pipeline_defaults`.
 
-A special `task` type `pipeline` may be used in `pipeline_defaults` `tasks` section. This task
-type is interpreted as "tasks defined in pipeline go here". This allows flexibility of defining
-common tasks to be run before and after the tasks of each pipeline defined in our liminal.yml
-file. For example:
+A special `task` type `pipeline` may be used in `pipeline_defaults` `tasks` section. This task type
+is interpreted as "
+tasks defined in pipeline go here". This allows flexibility of defining common tasks to be run
+before and after the tasks of each pipeline defined in our liminal.yml file. For example:
 
 ```yaml
 pipeline_defaults:
-  tasks:
+  before_tasks:
     - task: my_common_setup_task
       type: python
       image: myorg/myrepo:mypythonapp
       cmd: python -u my_setup_module.py
-    - task: pipeline_tasks
-      type: pipeline
+  after_tasks:
     - task: my_common_teardown_task1
       type: python
       image: myorg/myrepo:mypythonapp
@@ -239,7 +243,7 @@
 super: my_super
 ```
 
-A super is found by this name if a liminal.yml file in your environment exists with that name. A 
+A super is found by this name if a liminal.yml file in your environment exists with that name. A
 superliminal layer liminal.yml file needs to define its `type` as `super`:
 
 ```yaml
@@ -257,11 +261,11 @@
 super: my_other_super
 ```
 
-A liminal system can have 1 subliminal layer but many superliminal layers using inheritence.
-This allows us to chain common behaviors of our systems into several superliminal layers.
+A liminal system can have 1 subliminal layer but many superliminal layers using inheritence. This
+allows us to chain common behaviors of our systems into several superliminal layers.
 
 If no `super` is defined for a liminal.yml file then it defaults to having the
-[hyperliminal](https://github.com/Natural-Intelligence/liminal/blob/master/liminal/core/config/defaults/hyperliminal/liminal.yml)
+[base](https://github.com/incubator-liminal/liminal/core/config/defaults/base/liminal.yml)
 be its super.
 
 ### superliminal attribtues
@@ -276,5 +280,5 @@
 pipeline_defaults
 ```
 
-If the same attribute section is defined in both a superliminal and a lower layer of the system
-they will be merged, with key collisions favoring the lower level layer.
+If the same attribute section is defined in both a superliminal and a lower layer of the system they
+will be merged, with key collisions favoring the lower level layer.
diff --git a/docs/source/How_to_install_liminal_in_airflow_on_kubernetes.md b/docs/source/How_to_install_liminal_in_airflow_on_kubernetes.md
index 7afcad0..a5e730c 100644
--- a/docs/source/How_to_install_liminal_in_airflow_on_kubernetes.md
+++ b/docs/source/How_to_install_liminal_in_airflow_on_kubernetes.md
@@ -9,7 +9,7 @@
 
   http://www.apache.org/licenses/LICENSE-2.0
 
-Unless required bgit y applicable law or agreed to in writing,
+Unless required by applicable law or agreed to in writing,
 software distributed under the License is distributed on an
 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 KIND, either express or implied.  See the License for the
diff --git a/docs/source/install_liminal_in_airflow_on_kubernetes.sh b/docs/source/install_liminal_in_airflow_on_kubernetes.sh
index bcb0419..608c299 100755
--- a/docs/source/install_liminal_in_airflow_on_kubernetes.sh
+++ b/docs/source/install_liminal_in_airflow_on_kubernetes.sh
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
 #! /bin/bash
 
 help() {
diff --git a/liminal/build/liminal_apps_builder.py b/liminal/build/liminal_apps_builder.py
index 19b065f..5facc7d 100644
--- a/liminal/build/liminal_apps_builder.py
+++ b/liminal/build/liminal_apps_builder.py
@@ -91,28 +91,20 @@
 image_builders_package = 'liminal.build.image'
 # user_image_builders_package = 'TODO: user_image_builders_package'
 
-TASK_BUILD_CLASSES = class_util.find_subclasses_in_packages(
+task_build_types = class_util.find_subclasses_in_packages(
     [image_builders_package],
     ImageBuilder)
 
 
-def get_types_dict(task_build_classes):
-    # take module name from class name
-    return {x.split(".")[-2]: c for x, c in task_build_classes.items()}
-
-
-task_build_types = get_types_dict(TASK_BUILD_CLASSES)
-
-logging.info(f'Finished loading image builder implementations: {TASK_BUILD_CLASSES}')
+logging.info(f'Finished loading image builder implementations: {task_build_types}')
 logging.info(f'Loading service image builder implementations..')
 
 # TODO: add configuration for user service image builders package
 service_builders_package = 'liminal.build.service'
 # user_service_builders_package = 'TODO: user_service_builders_package'
 
-service_build_classes = class_util.find_subclasses_in_packages(
+service_build_types = class_util.find_subclasses_in_packages(
     [service_builders_package],
     ServiceImageBuilderMixin)
 
-service_build_types = get_types_dict(service_build_classes)
-logging.info(f'Finished loading service image builder implementations: {service_build_classes}')
+logging.info(f'Finished loading service image builder implementations: {service_build_types}')
diff --git a/liminal/runners/airflow/tasks/defaults/__init__.py b/liminal/core/config/__init__.py
similarity index 100%
rename from liminal/runners/airflow/tasks/defaults/__init__.py
rename to liminal/core/config/__init__.py
diff --git a/liminal/core/config/config.py b/liminal/core/config/config.py
new file mode 100644
index 0000000..7fa1ff0
--- /dev/null
+++ b/liminal/core/config/config.py
@@ -0,0 +1,159 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+import traceback
+
+from liminal.core import environment
+from liminal.core.config.defaults import base, default_configs
+from liminal.core.util import dict_util
+from liminal.core.util import files_util
+
+
+class ConfigUtil:
+    """
+    Load and enrich config files under configs_path.
+    """
+    __BASE = "base"
+    __PIPELINES = "pipelines"
+    __SUPER = "super"
+    __TYPE = "type"
+    __SUB = "sub"
+    __SERVICES = "services"
+    __TASKS = "tasks"
+    __PIPELINE_DEFAULTS = "pipeline_defaults"
+    __BEFORE_TASKS = "before_tasks"
+    __AFTER_TASKS = "after_tasks"
+
+    def __init__(self, configs_path):
+        self.configs_path = configs_path
+        self.config_files = files_util.load(configs_path)
+        self.base = base.BASE
+        self.loaded_subliminals = []
+        self.snapshot_path = os.path.join(environment.get_airflow_home_dir(),
+                                          '../liminal_config_files')
+
+    def safe_load(self, is_render_variables):
+        """
+        :returns list of config files after enrich with defaults and supers
+        """
+        if self.loaded_subliminals:
+            return self.loaded_subliminals
+
+        configs = self.config_files.values()
+        enriched_configs = []
+
+        for subliminal in [config for config in configs if self.__is_subliminal(config)]:
+            name = subliminal.get('name')
+            logging.info(f'Loading yml {name}')
+            # noinspection PyBroadException
+            try:
+                superliminal = self.__get_superliminal(subliminal)
+                enriched_config = self.__merge_configs(subliminal, superliminal,
+                                                       is_render_variables)
+                enriched_configs.append(enriched_config)
+            except Exception:
+                logging.error(f'Failed to load yml {name}')
+                traceback.print_exc()
+
+        self.loaded_subliminals = enriched_configs
+
+        return self.loaded_subliminals
+
+    def __merge_configs(self, subliminal, superliminal, is_render_variables):
+        if not superliminal:
+            return subliminal
+
+        sub = subliminal.copy()
+        supr = superliminal.copy()
+
+        merged_superliminal = self.__merge_configs(supr, self.__get_superliminal(supr),
+                                                   is_render_variables)
+
+        if self.__is_subliminal(sub):
+            return self.__merge_sub_and_super(sub, merged_superliminal, is_render_variables)
+        else:
+            return self.__merge_superliminals(sub, merged_superliminal)
+
+    def __get_superliminal(self, liminal):
+        superliminal = {}
+        if not self.__is_base_config(liminal):
+            superliminal_name = liminal.get(self.__SUPER, '')
+            if not superliminal_name:
+                superliminal = self.base
+            else:
+                superliminal = self.__get_config(superliminal_name)
+
+                if not superliminal:
+                    raise FileNotFoundError(
+                        f"superliminal '{superliminal_name}' is missing from '{self.configs_path}'")
+
+        return superliminal
+
+    def __get_base_config(self):
+        return self.base
+
+    def __is_base_config(self, config):
+        return config.get('name', '') == self.__BASE
+
+    def __is_subliminal(self, config):
+        is_subliminal = config.get(self.__TYPE, self.__SUB) != self.__SUPER
+        if is_subliminal:
+            config[self.__TYPE] = self.__SUB
+        return is_subliminal
+
+    def __get_config(self, config_name):
+        return self.config_files.get(config_name)
+
+    def __merge_sub_and_super(self, sub, supr, is_render_variables):
+        merged_pipelines = list()
+
+        for pipeline in sub.get(self.__PIPELINES, {}):
+            final_pipeline = self.__apply_pipeline_defaults(sub, supr, pipeline)
+            merged_pipelines.append(final_pipeline)
+
+        sub[self.__PIPELINES] = merged_pipelines
+        sub[self.__SERVICES] = default_configs.apply_service_defaults(sub, supr)
+
+        sub = dict_util.merge_dicts(supr.copy(), sub)
+
+        return default_configs.apply_variable_substitution(sub, supr, is_render_variables)
+
+    def __merge_superliminals(self, super1, super2):
+        super1_pipeline_defaults = super1.get(self.__PIPELINE_DEFAULTS, {}).copy()
+        super2_pipeline_defaults = super2.get(self.__PIPELINE_DEFAULTS, {}).copy()
+
+        super1[self.__PIPELINE_DEFAULTS][self.__BEFORE_TASKS] = \
+            super2_pipeline_defaults.pop(self.__BEFORE_TASKS, []) + super1_pipeline_defaults.pop(
+                self.__BEFORE_TASKS, [])
+
+        super1[self.__PIPELINE_DEFAULTS][self.__AFTER_TASKS] = \
+            super1_pipeline_defaults.pop(self.__AFTER_TASKS, []) + super2_pipeline_defaults.pop(
+                self.__AFTER_TASKS, [])
+
+        # merge supers tasks
+        return dict_util.merge_dicts(super1, super2, True)
+
+    def snapshot_final_liminal_configs(self):
+        files_util.dump_liminal_configs(liminal_configs=self.loaded_subliminals,
+                                        path=self.snapshot_path)
+
+    @staticmethod
+    def __apply_pipeline_defaults(subliminal, superliminal, pipeline):
+        return default_configs.apply_pipeline_defaults(subliminal, superliminal, pipeline)
diff --git a/liminal/core/config/defaults/base/__init__.py b/liminal/core/config/defaults/base/__init__.py
new file mode 100644
index 0000000..38e28b9
--- /dev/null
+++ b/liminal/core/config/defaults/base/__init__.py
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from liminal.core.util import files_util
+
+BASE = files_util.load(os.path.dirname(__file__))['base'].copy()
diff --git a/liminal/core/config/defaults/base/liminal.yml b/liminal/core/config/defaults/base/liminal.yml
new file mode 100644
index 0000000..d2a1783
--- /dev/null
+++ b/liminal/core/config/defaults/base/liminal.yml
@@ -0,0 +1,32 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: base
+type: super
+service_defaults:
+  description: add defaults parameters for all services
+task_defaults:
+  description: add defaults parameters for all tasks separate by task type
+pipeline_defaults:
+  description: add defaults parameters for all pipelines
+  before_tasks:
+    - task: start
+      type: job_start
+  after_tasks:
+    - task: end
+      type: job_end
\ No newline at end of file
diff --git a/liminal/core/config/defaults/default_configs.py b/liminal/core/config/defaults/default_configs.py
new file mode 100644
index 0000000..25d5001
--- /dev/null
+++ b/liminal/core/config/defaults/default_configs.py
@@ -0,0 +1,132 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from liminal.core.util import dict_util
+from liminal.core.util.dict_util import merge_dicts
+
+__SERVICES = "services"
+__TASKS = "tasks"
+__BEFORE_TASKS = "before_tasks"
+__AFTER_TASKS = "after_tasks"
+
+
+def apply_variable_substitution(subliminal, superliminal, is_render_variables=False):
+    """
+     if is_render_variables is True
+        Replace all {{variable.key}} in subliminal with variable.value variable in
+        subliminal.variables + superliminal.variables
+     else
+        merge subliminal.variables with superliminal.variables without replace
+        placeholders
+     """
+    keyword = "variables"
+    merged_variables = dict_util.merge_dicts(subliminal.get(keyword, {}),
+                                             superliminal.get(keyword, {}), True)
+    if is_render_variables:
+        for k, v in merged_variables.items():
+            if isinstance(v, str) or (not isinstance(v, dict) and not isinstance(v, list)):
+                merged_variables[k] = dict_util.replace_placholders_in_string(str(v),
+                                                                              merged_variables)
+
+        merged_variables = dict_util.replace_placeholders(merged_variables, merged_variables)
+        return dict_util.replace_placeholders(subliminal, merged_variables)
+    else:
+        subliminal[keyword] = merged_variables
+        return subliminal
+
+
+def apply_service_defaults(subliminal, superliminal):
+    """ Apply defaults services
+        :param subliminal: subliminal config
+        :param superliminal: superliminal config
+        :returns: enriched services with superliminal.service_defaults & subliminal.service_defaults
+    """
+    keyword = "service_defaults"
+    superliminal_service_defaults = superliminal.get(keyword, {})
+    subliminal_service_defaults = subliminal.get(keyword, {})
+
+    merged_service_defaults = merge_dicts(subliminal_service_defaults,
+                                          superliminal_service_defaults,
+                                          recursive=True)
+
+    return [merge_dicts(service, merged_service_defaults, True) for service in
+            subliminal.get(__SERVICES, {})]
+
+
+def apply_pipeline_defaults(subliminal, superliminal, pipeline):
+    """Apply defaults values on given pipeline
+    :param subliminal: subliminal config
+    :param superliminal: superliminal config
+    :param  pipeline: to apply defaults on
+
+    :returns: enriched pipeline with superliminal.pipeline_defaults & subliminal.pipeline_defaults
+    """
+    keyword = "pipeline_defaults"
+    superliminal_pipe_defaults = superliminal.get(keyword, {}).copy()
+    subliminal_pipe_defaults = subliminal.get(keyword, {}).copy()
+    superliminal_before_tasks = superliminal_pipe_defaults.pop(__BEFORE_TASKS, [])
+    superliminal_after_tasks = superliminal_pipe_defaults.pop(__AFTER_TASKS, [])
+    merged_pipeline_defaults = merge_dicts(subliminal_pipe_defaults, superliminal_pipe_defaults,
+                                           True)
+    pipeline = merge_dicts(pipeline, merged_pipeline_defaults, True)
+
+    return apply_task_defaults(subliminal, superliminal, pipeline,
+                               superliminal_before_tasks=superliminal_before_tasks,
+                               superliminal_after_tasks=superliminal_after_tasks)
+
+
+def apply_task_defaults(subliminal, superliminal, pipeline, superliminal_before_tasks,
+                        superliminal_after_tasks):
+    """Apply defaults task values on given pipeline
+           :param subliminal: subliminal config
+           :param superliminal: superliminal config
+           :param  pipeline: where 'tasks' list can be found it
+           :param  superliminal_before_tasks: superliminal before subtasks list
+           :param  superliminal_after_tasks: superliminal after subtasks list
+
+           :returns: pipeline after enrich with superliminal.tasks and superliminal.task_defaults
+            and subliminal.task_defaults
+    """
+    pipeline[__TASKS] = __apply_task_defaults(subliminal, superliminal, pipeline.get(__TASKS, []),
+                                              superliminal_before_tasks, superliminal_after_tasks)
+
+    return pipeline
+
+
+def __apply_task_defaults(subliminal,
+                          superliminal,
+                          subliminal_tasks,
+                          superliminal_before_tasks,
+                          superliminal_after_tasks):
+    keyword = "task_defaults"
+    subliminal_tasks_defaults = subliminal.get(keyword, {})
+    superliminal_tasks_defaults = superliminal.get(keyword, {})
+    merged_task_defaults = merge_dicts(subliminal_tasks_defaults, superliminal_tasks_defaults,
+                                       recursive=True)
+
+    return __enrich_tasks(subliminal_tasks, superliminal_before_tasks, superliminal_after_tasks,
+                          merged_task_defaults)
+
+
+def __enrich_tasks(sub_tasks, super_before_tasks, super_after_tasks, task_defaults):
+    merged_tasks = super_before_tasks + sub_tasks + super_after_tasks
+
+    return [
+        merge_dicts(task, task_defaults.get(task.get('type', ''), {}), recursive=True)
+        for task in merged_tasks
+    ]
diff --git a/liminal/core/environment.py b/liminal/core/environment.py
index acea4e4..ab6adaf 100644
--- a/liminal/core/environment.py
+++ b/liminal/core/environment.py
@@ -16,9 +16,9 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import logging
 import os
 import subprocess
-import logging
 
 DEFAULT_DAGS_ZIP_NAME = 'liminal.zip'
 DEFAULT_LIMINAL_HOME = os.path.expanduser('~/liminal_home')
@@ -61,3 +61,9 @@
         logging.info(f'LIMINAL_VERSION not set. Setting it to currently installed version: {value}')
         os.environ[LIMINAL_VERSION_PARAM_NAME] = value
     return os.environ.get(LIMINAL_VERSION_PARAM_NAME, 'apache-liminal')
+
+
+def get_airflow_home_dir():
+    # if we are inside airflow, we will take it from the configured dags folder
+    base_dir = os.environ.get("AIRFLOW__CORE__DAGS_FOLDER", get_liminal_home())
+    return os.path.join(base_dir)
diff --git a/liminal/core/util/class_util.py b/liminal/core/util/class_util.py
index 31a9185..57492c9 100644
--- a/liminal/core/util/class_util.py
+++ b/liminal/core/util/class_util.py
@@ -42,8 +42,7 @@
                         subclasses.add(child)
                         break
 
-    result = {sc.__module__ + "." + sc.__name__: sc for sc in subclasses}
-    return result
+    return dict([(sc.__module__.split(".")[-1], sc) for sc in subclasses])
 
 
 def import_module(package, recursive=True):
diff --git a/liminal/core/util/dict_util.py b/liminal/core/util/dict_util.py
new file mode 100644
index 0000000..e20b1a6
--- /dev/null
+++ b/liminal/core/util/dict_util.py
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import re
+from collections import OrderedDict
+
+from sqlalchemy.util import OrderedSet
+
+from liminal.runners.airflow.config import standalone_variable_backend
+
+
+def merge_dicts(dict1, dict2, recursive=False):
+    """
+    :returns dict1 enriched by dict2
+    """
+    if not recursive:
+        return {**dict1, **dict2}
+
+    return __merge_dicts(dict1, dict2)
+
+
+def __merge_dicts(dict1, dict2):
+    # recursive merge
+    merged_dicts = OrderedDict()
+    dict_1_keys = dict1.keys()
+    dict_2_keys = dict2.keys()
+    for k in OrderedSet(dict_1_keys).union(dict_2_keys):
+        if k in dict1 and k in dict2:
+            if isinstance(dict1[k], dict) and isinstance(dict2[k], dict):
+                merged_dicts[k] = dict(__merge_dicts(dict1[k], dict2[k]))
+            else:
+                merged_dicts[k] = dict1[k]
+        elif k in dict_1_keys:
+            merged_dicts[k] = dict1[k]
+        else:
+            merged_dicts[k] = dict2[k]
+    return merged_dicts
+
+
+__PLACE_HOLDER_PATTERN = r"{{\s*([a-zA-Z0-9._-]+)\s*}}"
+
+
+def replace_placeholders(dct, variables):
+    """
+    Replace all {{variable.key}} in dct with variable.value variable in variables
+    """
+    return dict(__replace_placeholders(dct, variables))
+
+
+def replace_placholders_in_string(string_value, variables, pattern=__PLACE_HOLDER_PATTERN):
+    return re.sub(pattern, lambda m: __repl(m, variables), string_value, flags=re.IGNORECASE)
+
+
+def __replace_placeholders(dct, variables):
+    dct_items = dct.items()
+    for k, v in dct_items:
+        if isinstance(v, str):
+            yield k, replace_placholders_in_string(v, variables)
+        elif isinstance(v, dict):
+            yield k, dict(__replace_placeholders(v, variables))
+        elif isinstance(v, list):
+            yield k, list(__replace_placeholder_in_list(v, variables))
+        else:
+            yield k, v
+
+
+def __replace_placeholder_in_list(lst, variables):
+    for v in lst:
+        if isinstance(v, str):
+            yield replace_placholders_in_string(v, variables)
+        elif isinstance(v, dict):
+            yield dict(__replace_placeholders(v, variables))
+        elif isinstance(v, list):
+            yield __replace_placeholder_in_list(v, variables)
+        else:
+            yield v
+
+
+def __repl(matched, variables):
+    origin = matched.group(0)
+    key = matched.group(1)
+    return variables.get(key, __try_backend_variables(key, default=origin))
+
+
+def __try_backend_variables(key, default):
+    return standalone_variable_backend.get_variable(key, default)
diff --git a/liminal/core/util/files_util.py b/liminal/core/util/files_util.py
index 48ab9b7..798e468 100644
--- a/liminal/core/util/files_util.py
+++ b/liminal/core/util/files_util.py
@@ -16,11 +16,49 @@
 # specific language governing permissions and limitations
 # under the License.
 
-import os
 import logging
+import os
+
+import yaml
+
+# { path -> { name -> config_file } }
+cached_files = dict()
+cached_source_files = dict()
+
+
+def resolve_pipeline_source_file(config_name):
+    """
+    Return the source (file) path of the given config name
+    """
+    return cached_source_files[config_name]
+
+
+def load(path):
+    """
+    :param path: config path
+    :returns dict of {name -> loaded config file} from all liminal.y[a]ml files under given path
+    """
+
+    if cached_files.get(path):
+        return cached_files[path]
+
+    config_entities = {}
+
+    for file_data in find_config_files(path):
+        with open(file_data, 'r') as data:
+            config_file = yaml.safe_load(data)
+            config_entities[config_file['name']] = config_file
+            cached_source_files[config_file['name']] = file_data
+
+    cached_files[path] = config_entities
+    return config_entities
 
 
 def find_config_files(path):
+    """
+    :param path: config path
+    :returns list of all liminal.y[a]ml files under config path
+    """
     files = []
     logging.info(path)
     for r, d, f in os.walk(path):
@@ -29,3 +67,19 @@
                 logging.info(os.path.join(r, file))
                 files.append(os.path.join(r, file))
     return files
+
+
+def dump_liminal_configs(liminal_configs, path):
+    if not (os.path.exists(path)):
+        os.mkdir(path)
+
+    logging.info(f"Starting to dump liminal configs into {path}")
+
+    for liminal_config in liminal_configs:
+        dump_liminal_config(liminal_config, f'{path}/{liminal_config["name"]}.yml')
+
+
+def dump_liminal_config(liminal_config, file_path):
+    with open(file_path, 'w') as config_file:
+        logging.info(f"Dumping {liminal_config['name']} into {file_path}")
+        yaml.dump(liminal_config, config_file, default_flow_style=False)
diff --git a/liminal/runners/airflow/__init__.py b/liminal/runners/airflow/__init__.py
index d69bec4..e7b2850 100644
--- a/liminal/runners/airflow/__init__.py
+++ b/liminal/runners/airflow/__init__.py
@@ -1,4 +1,4 @@
-#
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,11 +15,31 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+from datetime import datetime
+from unittest.mock import MagicMock
+
+import pytz
+
+
+class DummyDagRun:
+
+    def __init__(self) -> None:
+        self.start_date = pytz.utc.localize(datetime.utcnow())
+        self.conf = None
+        self.run_id = "run_id"
+
+    @staticmethod
+    def get_task_instances():
+        return []
+
+    def get_task_instance(self, _):
+        return self
+      
 from datetime import datetime
 
 TASK_ID_SEPARATOR = '.'
 
-
 class DummyDag:
 
     def __init__(self, dag_id, task_id):
@@ -27,9 +47,22 @@
         self.task_id = task_id
         self.try_number = 0
         self.is_subdag = False
+        self.execution_date = '2017-05-21T00:00:00'
+        self.dag_run_id = 'dag_run_id'
+        self.owner = ['owner1', 'owner2']
+        self.email = ['email1@test.com']
+        self.task = MagicMock(name='task', owner=self.owner, email=self.email)
         self.context = {
             'dag': self,
             'task': self,
             'ti': self,
-            'ts': datetime.now().timestamp()
-        }
\ No newline at end of file
+            'ts': datetime.now().timestamp(),
+            'dag_run': DummyDagRun()
+        }
+
+    def get_dagrun(self):
+        return self.context['dag_run']
+
+    @staticmethod
+    def xcom_push(key, value):
+        return key, value
diff --git a/liminal/runners/airflow/config/standalone_variable_backend.py b/liminal/runners/airflow/config/standalone_variable_backend.py
index b49c7fe..a7f8102 100644
--- a/liminal/runners/airflow/config/standalone_variable_backend.py
+++ b/liminal/runners/airflow/config/standalone_variable_backend.py
@@ -15,19 +15,32 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+import logging
 import os
 from os import environ
+from sqlite3 import OperationalError
 
 from airflow.models import Variable
 
 LIMINAL_STAND_ALONE_MODE_KEY = "LIMINAL_STAND_ALONE_MODE"
 
 
+# noinspection PyBroadException
 def get_variable(key, default_val):
     if liminal_local_mode():
         return os.environ.get(key, default_val)
     else:
-        return Variable.get(key, default_var=default_val)
+        try:
+            return Variable.get(key, default_var=default_val)
+        except OperationalError as e:
+            logging.warning(
+                f'Failed to find variable {key} in Airflow variables table.'
+                f' Error: {e.__class__.__module__}.{e.__class__.__name__}'
+            )
+        except Exception as e:
+            logging.warning(f'Failed to find variable {key} in Airflow variables table. Error: {e}')
+            return default_val
 
 
 def liminal_local_mode():
diff --git a/liminal/runners/airflow/dag/liminal_dags.py b/liminal/runners/airflow/dag/liminal_dags.py
index 03911b1..be45de0 100644
--- a/liminal/runners/airflow/dag/liminal_dags.py
+++ b/liminal/runners/airflow/dag/liminal_dags.py
@@ -15,105 +15,138 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import logging
+import os
+import traceback
 from datetime import datetime, timedelta
 
-import yaml
 from airflow import DAG
 
-from liminal.core import environment
+from liminal.core import environment as env
+from liminal.core.config.config import ConfigUtil
 from liminal.core.util import class_util
-from liminal.core.util import files_util
 from liminal.runners.airflow.model.task import Task
-from liminal.runners.airflow.tasks.defaults.job_end import JobEndTask
-from liminal.runners.airflow.tasks.defaults.job_start import JobStartTask
-import logging
 
 __DEPENDS_ON_PAST = 'depends_on_past'
 
 
+# noinspection PyBroadException
 def register_dags(configs_path):
     """
     Registers pipelines in liminal yml files found in given path (recursively) as airflow DAGs.
     """
-    logging.info(f'Registering DAG from path: {configs_path}')
-    config_files = files_util.find_config_files(configs_path)
+    logging.info(f'Registering DAGs from path: {configs_path}')
+    config_util = ConfigUtil(configs_path)
+    # TODO - change is_render_variable to False when runtime resolving is available
+    configs = config_util.safe_load(is_render_variables=True)
+
+    if os.getenv('POD_NAMESPACE') != "jenkins":
+        config_util.snapshot_final_liminal_configs()
 
     dags = []
-    logging.info(f'found {len(config_files)} in path: {configs_path}')
-    for config_file in config_files:
-        logging.info(f'Registering DAG for file: {config_file}')
+    logging.info(f'found {len(configs)} liminal configs in path: {configs_path}')
+    for config in configs:
+        name = config['name'] if 'name' in config else None
+        try:
+            if not name:
+                raise ValueError('liminal.yml missing field `name`')
 
-        with open(config_file) as stream:
-            config = yaml.safe_load(stream)
+            logging.info(f"Registering DAGs for {name}")
+
+            owner = config.get('owner')
+
+            trigger_rule = 'all_success'
+            if 'always_run' in config and config['always_run']:
+                trigger_rule = 'all_done'
 
             for pipeline in config['pipelines']:
-                pipeline_name = pipeline['pipeline']
+                default_args = __default_args(pipeline)
+                dag = __initialize_dag(default_args, pipeline, owner)
 
-                default_args = {k: v for k, v in pipeline.items()}
-
-                override_args = {
-                    'start_date': datetime.combine(pipeline['start_date'], datetime.min.time()),
-                    __DEPENDS_ON_PAST: default_args[
-                        __DEPENDS_ON_PAST] if __DEPENDS_ON_PAST in default_args else False,
-                }
-
-                default_args.update(override_args)
-
-                dag = DAG(
-                    dag_id=pipeline_name,
-                    default_args=default_args,
-                    dagrun_timeout=timedelta(minutes=pipeline['timeout_minutes']),
-                    catchup=False
-                )
-
-                job_start_task = JobStartTask(dag, config, pipeline, {}, None, 'all_success')
-                parent = job_start_task.apply_task_to_dag()
-
-                trigger_rule = 'all_success'
-                if 'always_run' in config and config['always_run']:
-                    trigger_rule = 'all_done'
+                parent = None
 
                 for task in pipeline['tasks']:
                     task_type = task['type']
                     task_instance = get_task_class(task_type)(
-                        dag, config, pipeline, task, parent if parent else None, trigger_rule
+                        task_id=task['task'],
+                        dag=dag,
+                        parent=parent,
+                        trigger_rule=trigger_rule,
+                        liminal_config=config,
+                        pipeline_config=pipeline,
+                        task_config=task
                     )
 
                     parent = task_instance.apply_task_to_dag()
 
-                job_end_task = JobEndTask(dag, config, pipeline, {}, parent, 'all_done')
-                job_end_task.apply_task_to_dag()
-
                 logging.info(f'registered DAG {dag.dag_id}: {dag.tasks}')
 
-                globals()[pipeline_name] = dag
+                globals()[pipeline['pipeline']] = dag
                 dags.append(dag)
 
+        except Exception:
+            logging.error(f'Failed to register DAGs for {name}')
+            traceback.print_exc()
+
     return dags
 
 
+def __initialize_dag(default_args, pipeline, owner):
+    pipeline_name = pipeline['pipeline']
+
+    schedule_interval = default_args.get('schedule_interval', None)
+    if not schedule_interval:
+        schedule_interval = default_args.get('schedule', None)
+
+    if owner and 'owner' not in default_args:
+        default_args['owner'] = owner
+
+    start_date = pipeline.get('start_date', datetime.min.time())
+    if not isinstance(start_date, datetime):
+        start_date = datetime.combine(start_date, datetime.min.time())
+
+    default_args.pop('tasks', None)
+    default_args.pop('schedule', None)
+    default_args.pop('monitoring', None)
+    default_args.pop('schedule_interval', None)
+
+    dag = DAG(
+        dag_id=pipeline_name,
+        default_args=default_args,
+        dagrun_timeout=timedelta(minutes=pipeline['timeout_minutes']),
+        start_date=start_date,
+        schedule_interval=schedule_interval,
+        catchup=False
+    )
+
+    return dag
+
+
+def __default_args(pipeline):
+    default_args = {k: v for k, v in pipeline.items()}
+    override_args = {
+        'start_date': datetime.combine(pipeline['start_date'], datetime.min.time()),
+        __DEPENDS_ON_PAST: default_args[
+            __DEPENDS_ON_PAST
+        ] if __DEPENDS_ON_PAST in default_args else False,
+    }
+    default_args.update(override_args)
+    return default_args
+
+
 logging.info(f'Loading task implementations..')
 
 # TODO: add configuration for user tasks package
 impl_packages = 'liminal.runners.airflow.tasks'
 user_task_package = 'TODO: user_tasks_package'
 
+tasks_by_liminal_name = class_util.find_subclasses_in_packages([impl_packages], Task)
 
-def tasks_by_liminal_name(task_classes):
-    return {full_name.replace(impl_packages, '').replace(clzz.__name__, '')[1:-1]: clzz
-            for (full_name, clzz) in task_classes.items()}
-
-
-tasks_by_liminal_name = tasks_by_liminal_name(
-    class_util.find_subclasses_in_packages([impl_packages], Task)
-)
-
-logging.info(f'Finished loading task implementations: {tasks_by_liminal_name}')
+logging.info(f'Finished loading task implementations: {tasks_by_liminal_name.keys()}')
 
 
 def get_task_class(task_type):
     return tasks_by_liminal_name[task_type]
 
 
-register_dags(environment.get_dags_dir())
+register_dags(os.path.join(env.get_airflow_home_dir(), env.DEFAULT_PIPELINES_SUBDIR))
diff --git a/liminal/runners/airflow/model/dag_mutator.py b/liminal/runners/airflow/model/dag_mutator.py
new file mode 100644
index 0000000..1396fe0
--- /dev/null
+++ b/liminal/runners/airflow/model/dag_mutator.py
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from abc import ABC, abstractmethod
+
+
+class DagMutator(ABC):
+    """
+    Liminal Dag mutator.
+    """
+
+    def __init__(self, liminal_config):
+        self.liminal_config = liminal_config
+
+    @abstractmethod
+    def apply_task_to_dag(self, **kwargs):
+        pass
diff --git a/liminal/runners/airflow/model/task.py b/liminal/runners/airflow/model/task.py
index 25a162b..e4539be 100644
--- a/liminal/runners/airflow/model/task.py
+++ b/liminal/runners/airflow/model/task.py
@@ -15,26 +15,26 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
 """
 Base task.
 """
+from abc import ABC
+
+from liminal.runners.airflow.model import dag_mutator
 
 
-class Task:
+class Task(dag_mutator.DagMutator, ABC):
     """
     Task.
     """
 
-    def __init__(self, dag, liminal_config, pipeline_config, task_config, parent, trigger_rule):
+    def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                 task_config):
+        super().__init__(liminal_config)
         self.dag = dag
-        self.liminal_config = liminal_config
         self.pipeline_config = pipeline_config
-        self.task_config = task_config
+        self.task_id = task_id
         self.parent = parent
         self.trigger_rule = trigger_rule
-
-    def apply_task_to_dag(self):
-        """
-        Registers Airflow operator to parent task.
-        """
-        raise NotImplementedError()
+        self.task_config = task_config
diff --git a/liminal/runners/airflow/tasks/create_cloudformation_stack.py b/liminal/runners/airflow/tasks/create_cloudformation_stack.py
index 2084dc7..a5a5f34 100644
--- a/liminal/runners/airflow/tasks/create_cloudformation_stack.py
+++ b/liminal/runners/airflow/tasks/create_cloudformation_stack.py
@@ -24,8 +24,10 @@
     Creates cloud_formation stack.
     """
 
-    def __init__(self, dag, liminal_config, pipeline_config, task_config, parent, trigger_rule):
-        super().__init__(dag, liminal_config, pipeline_config, task_config, parent, trigger_rule)
+    def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                 task_config):
+        super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                         task_config)
 
     def apply_task_to_dag(self):
         pass
diff --git a/liminal/runners/airflow/tasks/defaults/job_phase_task.py b/liminal/runners/airflow/tasks/defaults/job_phase_task.py
deleted file mode 100644
index ce031f6..0000000
--- a/liminal/runners/airflow/tasks/defaults/job_phase_task.py
+++ /dev/null
@@ -1,40 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-"""
-Default base task.
-"""
-from abc import abstractmethod
-
-from liminal.runners.airflow.model.task import Task
-
-
-class JobPhaseTask(Task):
-    """
-    Job phase task. A task that runs automatically at a specific phase in the pipeline.
-    """
-
-    def __init__(self, dag, liminal_config, pipeline_config, task_config, parent, trigger_rule):
-        super().__init__(dag, liminal_config, pipeline_config, task_config, parent, trigger_rule)
-        metrics = self.liminal_config.get('metrics', {})
-        self.metrics_namespace = metrics.get('namespace', '')
-        self.metrics_backends = metrics.get('backends', [])
-
-    @abstractmethod
-    def apply_task_to_dag(self):
-        pass
diff --git a/liminal/runners/airflow/tasks/delete_cloudformation_stack.py b/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
index 1a4641c..9115544 100644
--- a/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
+++ b/liminal/runners/airflow/tasks/delete_cloudformation_stack.py
@@ -24,8 +24,10 @@
     Deletes cloud_formation stack.
     """
 
-    def __init__(self, dag, liminal_config, pipeline_config, task_config, parent, trigger_rule):
-        super().__init__(dag, liminal_config, pipeline_config, task_config, parent, trigger_rule)
+    def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                 task_config):
+        super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                         task_config)
 
     def apply_task_to_dag(self):
         pass
diff --git a/liminal/runners/airflow/tasks/defaults/job_end.py b/liminal/runners/airflow/tasks/job_end.py
similarity index 72%
rename from liminal/runners/airflow/tasks/defaults/job_end.py
rename to liminal/runners/airflow/tasks/job_end.py
index 4bc4e28..57f99f9 100644
--- a/liminal/runners/airflow/tasks/defaults/job_end.py
+++ b/liminal/runners/airflow/tasks/job_end.py
@@ -16,17 +16,22 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from liminal.runners.airflow.model import task
 from liminal.runners.airflow.operators.job_status_operator import JobEndOperator
-from liminal.runners.airflow.tasks.defaults.job_phase_task import JobPhaseTask
 
 
-class JobEndTask(JobPhaseTask):
+class JobEndTask(task.Task):
     """
       Job end task. Reports job end metrics.
     """
 
-    def __init__(self, dag, liminal_config, pipeline_config, task_config, parent, trigger_rule):
-        super().__init__(dag, liminal_config, pipeline_config, task_config, parent, trigger_rule)
+    def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                 task_config):
+        super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                         task_config)
+        metrics = self.liminal_config.get('metrics', {})
+        self.metrics_namespace = metrics.get('namespace', '')
+        self.metrics_backends = metrics.get('backends', [])
 
     def apply_task_to_dag(self):
         job_end_task = JobEndOperator(
diff --git a/liminal/runners/airflow/tasks/defaults/job_start.py b/liminal/runners/airflow/tasks/job_start.py
similarity index 72%
rename from liminal/runners/airflow/tasks/defaults/job_start.py
rename to liminal/runners/airflow/tasks/job_start.py
index 6797140..f8017e6 100644
--- a/liminal/runners/airflow/tasks/defaults/job_start.py
+++ b/liminal/runners/airflow/tasks/job_start.py
@@ -16,17 +16,22 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from liminal.runners.airflow.model import task
 from liminal.runners.airflow.operators.job_status_operator import JobStartOperator
-from liminal.runners.airflow.tasks.defaults.job_phase_task import JobPhaseTask
 
 
-class JobStartTask(JobPhaseTask):
+class JobStartTask(task.Task):
     """
     Job start task. Reports job start metrics.
     """
 
-    def __init__(self, dag, liminal_config, pipeline_config, task_config, parent, trigger_rule):
-        super().__init__(dag, liminal_config, pipeline_config, task_config, parent, trigger_rule)
+    def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                 task_config):
+        super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                         task_config)
+        metrics = self.liminal_config.get('metrics', {})
+        self.metrics_namespace = metrics.get('namespace', '')
+        self.metrics_backends = metrics.get('backends', [])
 
     def apply_task_to_dag(self):
         job_start_task = JobStartOperator(
diff --git a/liminal/runners/airflow/tasks/python.py b/liminal/runners/airflow/tasks/python.py
index 3094720..16758d5 100644
--- a/liminal/runners/airflow/tasks/python.py
+++ b/liminal/runners/airflow/tasks/python.py
@@ -35,8 +35,10 @@
     Python task.
     """
 
-    def __init__(self, dag, liminal_config, pipeline_config, task_config, parent, trigger_rule):
-        super().__init__(dag, liminal_config, pipeline_config, task_config, parent, trigger_rule)
+    def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                 task_config):
+        super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                         task_config)
         self.task_name = self.task_config['task']
         self.image = self.task_config['image']
         self.volumes = self._volumes()
diff --git a/liminal/runners/airflow/tasks/spark.py b/liminal/runners/airflow/tasks/spark.py
index ace3bc6..a64fefe 100644
--- a/liminal/runners/airflow/tasks/spark.py
+++ b/liminal/runners/airflow/tasks/spark.py
@@ -24,8 +24,10 @@
     Executes a Spark application.
     """
 
-    def __init__(self, dag, liminal_config, pipeline_config, task_config, parent, trigger_rule):
-        super().__init__(dag, liminal_config, pipeline_config, task_config, parent, trigger_rule)
+    def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                 task_config):
+        super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                         task_config)
 
     def apply_task_to_dag(self):
         pass
diff --git a/liminal/runners/airflow/tasks/sql.py b/liminal/runners/airflow/tasks/sql.py
index 94c4c49..5ec940e 100644
--- a/liminal/runners/airflow/tasks/sql.py
+++ b/liminal/runners/airflow/tasks/sql.py
@@ -24,8 +24,10 @@
     Executes an SQL application.
     """
 
-    def __init__(self, dag, liminal_config, pipeline_config, task_config, parent, trigger_rule):
-        super().__init__(dag, liminal_config, pipeline_config, task_config, parent, trigger_rule)
+    def __init__(self, task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                 task_config):
+        super().__init__(task_id, dag, parent, trigger_rule, liminal_config, pipeline_config,
+                         task_config)
 
     def apply_task_to_dag(self):
         pass
diff --git a/requirements.txt b/requirements.txt
index fd99be7..96b1dbb 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -26,3 +26,17 @@
 kubernetes==12.0.1
 wheel==0.36.2
 termcolor~=1.1.0
+docker-pycreds==0.4.0
+typing==3.7.4.1
+GitPython==3.1.11
+ddtrace==0.37.0
+moto==1.3.14
+diskcache==3.1.1
+croniter==0.3.31
+pytz==2020.5
+pytzdata==2020.1
+freezegun==1.1.0
+statsd>=3.3.0, <4.0
+sqlalchemy~=1.3.15
+jinja2>=2.10.1, <2.11.0
+python-json-logger==2.0.1
diff --git a/tests/runners/airflow/tasks/defaults/__init__.py b/tests/liminal/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/defaults/__init__.py
copy to tests/liminal/__init__.py
diff --git a/tests/runners/airflow/tasks/defaults/__init__.py b/tests/liminal/core/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/defaults/__init__.py
copy to tests/liminal/core/__init__.py
diff --git a/tests/runners/airflow/tasks/defaults/__init__.py b/tests/liminal/core/config/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/defaults/__init__.py
copy to tests/liminal/core/config/__init__.py
diff --git a/tests/runners/airflow/tasks/defaults/__init__.py b/tests/liminal/core/config/defaults/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/defaults/__init__.py
copy to tests/liminal/core/config/defaults/__init__.py
diff --git a/tests/liminal/core/config/defaults/test_apply_variables_substitution.py b/tests/liminal/core/config/defaults/test_apply_variables_substitution.py
new file mode 100644
index 0000000..02a816e
--- /dev/null
+++ b/tests/liminal/core/config/defaults/test_apply_variables_substitution.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase
+
+from liminal.core.config.defaults import default_configs
+
+
+class TestApplyVariablesSubstitution(TestCase):
+    def test_apply(self):
+        subliminal = {'variables': {
+            'one': 'one_value',
+            'two': 'two_value'
+        }}
+
+        superliminal = {'variables': {
+            'three': 'three_value',
+            'two': 'two_super_value'
+        }}
+
+        expected = {'variables': {
+            'one': 'one_value',
+            'two': 'two_value',
+            'three': 'three_value'
+        }}
+        self.assertEqual(expected, default_configs.apply_variable_substitution(subliminal, superliminal))
+
+    def test_apply_superliminal_is_missing_variables(self):
+        subliminal = {'variables': {
+            'one': 'one_value',
+            'two': 'two_value'
+        }}
+
+        superliminal = {}
+
+        expected = {'variables': {
+            'one': 'one_value',
+            'two': 'two_value'
+        }}
+        self.assertEqual(expected, default_configs.apply_variable_substitution(subliminal, superliminal))
+
+    def test_apply_subliminal_is_missing_variables(self):
+        subliminal = {}
+
+        superliminal = {'variables': {
+            'one': 'one_value',
+            'two': 'two_value'
+        }}
+
+        expected = {'variables': {
+            'one': 'one_value',
+            'two': 'two_value'
+        }}
+        self.assertEqual(expected, default_configs.apply_variable_substitution(subliminal, superliminal))
diff --git a/tests/liminal/core/config/defaults/test_defaults_pipeline_config.py b/tests/liminal/core/config/defaults/test_defaults_pipeline_config.py
new file mode 100644
index 0000000..f3f5381
--- /dev/null
+++ b/tests/liminal/core/config/defaults/test_defaults_pipeline_config.py
@@ -0,0 +1,122 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase
+
+from liminal.core.config.defaults import default_configs
+
+
+class TestDefaultsPipelineConfig(TestCase):
+    def test_apply(self):
+        pipeline = {
+            "name": "mypipe",
+            "param": "constant",
+            "tasks": [
+                {
+                    "task": "middle_task",
+                    "type": "python"
+                }
+            ]
+        }
+
+        subliminal = {
+            "name": "my_subliminal_test",
+            "pipelines": [
+                {
+                    "name": "mypipe",
+                    "param": "constant",
+                    "tasks": [
+                        {
+                            "task": "middle_task",
+                            "type": "python",
+                            "env_vars": {
+                                "env1": "env1"
+                            }
+                        }
+                    ]
+                }
+            ],
+            "pipeline_defaults": {
+                "param1": "param1_value"
+            }
+        }
+        superliminal = {
+            "pipeline_defaults": {
+                "param2": "param2super_value",
+                "param3": "param3super_value",
+                "before_tasks": [
+                    {
+                        "task": "first_task",
+                        "type": "python"
+                    }],
+                "after_tasks": [
+                    {
+                        "task": "end_task",
+                        "type": "python"
+                    }
+                ]
+            },
+            "task_defaults": {
+                "python": {
+                    "default_task_super": "default_task_super_value",
+                    "env_vars": {
+                        "env1": "env1super",
+                        "env2": "env2super"
+                    }
+                }
+            }
+        }
+        expected = {
+            "param": "constant",
+            "param3": "param3super_value",
+            "param2": "param2super_value",
+            "tasks": [
+                {
+                    "type": "python",
+                    "env_vars": {
+                        "env1": "env1super",
+                        "env2": "env2super"
+                    },
+                    "task": "first_task",
+                    "default_task_super": "default_task_super_value"
+                },
+                {
+                    "type": "python",
+                    "env_vars": {
+                        "env1": "env1super",
+                        "env2": "env2super"
+                    },
+                    "task": "middle_task",
+                    "default_task_super": "default_task_super_value"
+                },
+                {
+                    "type": "python",
+                    "env_vars": {
+                        "env1": "env1super",
+                        "env2": "env2super"
+                    },
+                    "task": "end_task",
+                    "default_task_super": "default_task_super_value"
+                }
+            ],
+            "name": "mypipe",
+            "param1": "param1_value"
+        }
+
+        self.assertEqual(expected, default_configs.apply_pipeline_defaults(subliminal, superliminal,
+                                                                           pipeline))
diff --git a/tests/liminal/core/config/defaults/test_defaults_service_config.py b/tests/liminal/core/config/defaults/test_defaults_service_config.py
new file mode 100644
index 0000000..13062df
--- /dev/null
+++ b/tests/liminal/core/config/defaults/test_defaults_service_config.py
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase
+
+from liminal.core.config.defaults import default_configs
+
+
+class TestDefaultsServiceConfig(TestCase):
+    def test_apply(self):
+        subliminal = {
+            "services": [
+                {
+                    "name": "my_python_server",
+                    "type": "python_server",
+                    "image": "default_image"
+                },
+                {
+                    "name": "my_python_server_for_stg",
+                    "type": "python_server",
+                    "image": "test_image"
+                }
+            ]}
+
+        superliminal = {
+            "service_defaults": {
+                "param": "param1",
+                "param2": "param2"
+            }
+        }
+
+        expected = [{'image': 'default_image',
+                     'name': 'my_python_server',
+                     'param': 'param1',
+                     'param2': 'param2',
+                     'type': 'python_server'},
+                    {'image': 'test_image',
+                     'name': 'my_python_server_for_stg',
+                     'param': 'param1',
+                     'param2': 'param2',
+                     'type': 'python_server'}]
+
+        self.assertEqual(expected, default_configs.apply_service_defaults(subliminal, superliminal))
diff --git a/tests/liminal/core/config/defaults/test_defaults_tasks_config.py b/tests/liminal/core/config/defaults/test_defaults_tasks_config.py
new file mode 100644
index 0000000..caa1c45
--- /dev/null
+++ b/tests/liminal/core/config/defaults/test_defaults_tasks_config.py
@@ -0,0 +1,175 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from unittest import TestCase
+
+from liminal.core.config.defaults import default_configs
+
+
+class TestDefaultsTaskConfig(TestCase):
+    def test_apply(self):
+        pipeline = {
+            'pipeline': 'mypipe',
+            "tasks": [
+                {
+                    "task": "middle",
+                    "type": "spark",
+                    "task_param": "task_middle_param"
+                },
+                {
+                    "task": "end",
+                    "type": "python",
+                    "task_param": "task_end_param"
+                }
+            ]
+        }
+
+        subliminal = {'pipelines': [pipeline]}
+
+        superliminal = {
+            "task_defaults": {
+                "python": {
+                    "env_vars": {
+                        "env2": "env2value"
+                    }
+                },
+                "spark": {
+                    "task_param": "task_spark_param",
+                    "executor": "emr"
+                }
+            }
+        }
+
+        expected = {'pipeline': 'mypipe',
+                    'tasks': [{'env_vars': {'env1': 'env1value', 'env2': 'env2value'},
+                               'task': 'start',
+                               'type': 'python'},
+                              {'executor': 'emr',
+                               'task': 'middle',
+                               'task_param': 'task_middle_param',
+                               'type': 'spark'},
+                              {'env_vars': {'env2': 'env2value'},
+                               'task': 'end',
+                               'task_param': 'task_end_param',
+                               'type': 'python'}]}
+        self.assertEqual(expected, default_configs.apply_task_defaults(subliminal,
+                                                                       superliminal,
+                                                                       pipeline=pipeline,
+                                                                       superliminal_before_tasks=[{
+                                                                           "task": "start",
+                                                                           "type": "python",
+                                                                           "env_vars": {
+                                                                               "env1": "env1value"
+                                                                           }
+                                                                       }],
+                                                                       superliminal_after_tasks=[]))
+
+    def test_missing_tasks_from_supr(self):
+        pipeline = {
+            'pipeline': 'mypipe',
+            "tasks": [
+                {
+                    "task": "middle",
+                    "type": "spark",
+                    "task_param": "task_middle_param"
+                },
+                {
+                    "task": "end",
+                    "type": "python",
+                    "task_param": "task_end_param"
+                }
+            ]
+        }
+
+        subliminal = {'pipelines': [pipeline]}
+
+        superliminal = {
+            "task_defaults": {
+                "python": {
+                    "env_vars": {
+                        "env2": "env2value"
+                    }
+                },
+                "spark": {
+                    "task_param": "task_spark_param"
+                }
+            }
+        }
+
+        expected = {'pipeline': 'mypipe',
+                    'tasks': [{'task': 'middle',
+                               'task_param': 'task_middle_param',
+                               'type': 'spark'},
+                              {'env_vars': {'env2': 'env2value'},
+                               'task': 'end',
+                               'task_param': 'task_end_param',
+                               'type': 'python'}]}
+
+        self.assertEqual(expected, default_configs.apply_task_defaults(subliminal,
+                                                                       superliminal,
+                                                                       pipeline=pipeline,
+                                                                       superliminal_before_tasks=[],
+                                                                       superliminal_after_tasks=[]))
+
+    def test_missing_tasks_from_sub(self):
+        pipeline = {
+            'pipeline': 'mypipe'
+        }
+
+        subliminal = {'pipelines': [pipeline]}
+
+        superliminal = {
+            "task_defaults": {
+                "python": {
+                    "env_vars": {
+                        "env2": "env2value"
+                    }
+                },
+                "spark": {
+                    "task_param": "task_start_param",
+                }
+            }
+        }
+
+        expected = {'pipeline': 'mypipe',
+                    'tasks': [{
+                        'task': 'start',
+                        'task_param': 'task_middle_param',
+                        'type': 'spark'},
+                        {'env_vars': {'env2': 'env2value'},
+                         'task': 'end',
+                         'task_param': 'task_end_param',
+                         'type': 'end'}]}
+
+        self.assertEqual(expected,
+                         default_configs.apply_task_defaults(subliminal,
+                                                             superliminal,
+                                                             pipeline=pipeline,
+                                                             superliminal_before_tasks=[{
+                                                                 "task": "start",
+                                                                 "task_param": "task_middle_param",
+                                                                 "type": "spark"
+                                                             }],
+                                                             superliminal_after_tasks=[{
+                                                                 "env_vars": {
+                                                                     "env2": "env2value"
+                                                                 },
+                                                                 "task": "end",
+                                                                 "task_param": "task_end_param",
+                                                                 "type": "end"
+                                                             }]))
diff --git a/tests/liminal/core/config/test_config.py b/tests/liminal/core/config/test_config.py
new file mode 100644
index 0000000..a4d92b3
--- /dev/null
+++ b/tests/liminal/core/config/test_config.py
@@ -0,0 +1,441 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from unittest import TestCase, mock
+
+from liminal.core.config.config import ConfigUtil
+
+
+# noinspection PyUnresolvedReferences,DuplicatedCode
+class TestHierarchicalConfig(TestCase):
+
+    @mock.patch("liminal.core.util.files_util.load")
+    def test_safe_load(self, find_config_files_mock):
+        subliminal = {
+            "name": "my_subliminal_test",
+            "type": "sub",
+            "super": "my_superliminal_test",
+            "pipelines": [
+                {"name": "mypipe1", "param": "constant"},
+                {"name": "mypipe2", "param": "constant"}
+            ],
+            "pipeline_defaults": {
+                "param1": "param1_value"
+            },
+            "task_defaults": {
+                "job_start": {
+                    "task_sub_def": "task_sub_def_value"
+                }
+            }
+        }
+        superliminal = {
+            "name": "my_superliminal_test",
+            "type": "super",
+            "super": "super_superliminal",
+            "pipeline_defaults": {
+                "param2": "param2super_value",
+                "param3": "param3super_value"
+            },
+            "task_defaults": {
+                "job_start": {
+                    "task_def1": "task_def1_value",
+                    "task_def2": {
+                        "task_def2_1": "task_def2_1_value",
+                    }
+                }
+            }
+        }
+        super_superliminal = {
+            "name": "super_superliminal",
+            "type": "super",
+            "pipeline_defaults": {
+                "param2": "param2super_value",
+                "param3": "param3hyper_value",
+                "param4": "param4hyper_value"
+            }
+        }
+
+        expected = [{'name': 'my_subliminal_test',
+                     'pipeline_defaults': {'param1': 'param1_value'},
+                     'pipelines': [{'description': 'add defaults parameters for all pipelines',
+                                    'name': 'mypipe1',
+                                    'param': 'constant',
+                                    'param1': 'param1_value',
+                                    'param2': 'param2super_value',
+                                    'param3': 'param3super_value',
+                                    'param4': 'param4hyper_value',
+                                    'tasks': [{'task': 'start',
+                                               'task_def1': 'task_def1_value',
+                                               'task_def2': {'task_def2_1': 'task_def2_1_value'},
+                                               'task_sub_def': 'task_sub_def_value',
+                                               'type': 'job_start'},
+                                              {'task': 'end', 'type': 'job_end'}]},
+                                   {'description': 'add defaults parameters for all pipelines',
+                                    'name': 'mypipe2',
+                                    'param': 'constant',
+                                    'param1': 'param1_value',
+                                    'param2': 'param2super_value',
+                                    'param3': 'param3super_value',
+                                    'param4': 'param4hyper_value',
+                                    'tasks': [{'task': 'start',
+                                               'task_def1': 'task_def1_value',
+                                               'task_def2': {'task_def2_1': 'task_def2_1_value'},
+                                               'task_sub_def': 'task_sub_def_value',
+                                               'type': 'job_start'},
+                                              {'task': 'end', 'type': 'job_end'}]}],
+                     'service_defaults': {'description': 'add defaults parameters for all '
+                                                         'services'},
+                     'services': [],
+                     'super': 'my_superliminal_test',
+                     'task_defaults': {'job_start': {'task_sub_def': 'task_sub_def_value'}},
+                     'type': 'sub'}]
+
+        find_config_files_mock.return_value = {
+            "my_subliminal_test": subliminal,
+            "my_superliminal_test": superliminal,
+            "super_superliminal": super_superliminal
+        }
+
+        config_util = ConfigUtil("")
+
+        self.assertEqual(expected, config_util.safe_load(is_render_variables=True))
+
+        # validate cache
+        self.assertEqual(expected, config_util.loaded_subliminals)
+
+    @mock.patch("liminal.core.util.files_util.load")
+    def test_get_config(self, find_config_files_mock):
+        find_config_files_mock.return_value = {
+            "my_subliminal_test": {
+                "type": "sub"
+            },
+            "my_superliminal_test": {
+                "type": "super"
+            }
+        }
+
+        config_util = ConfigUtil("")
+
+        self.assertEqual({"type": "sub"},
+                         config_util._ConfigUtil__get_config("my_subliminal_test"))
+
+        self.assertEqual({"type": "super"},
+                         config_util._ConfigUtil__get_config("my_superliminal_test"))
+
+    @mock.patch("liminal.core.util.files_util.load")
+    def test_get_superliminal(self, find_config_files_mock):
+        base = {'name': 'base',
+                'pipeline_defaults': {'after_tasks': [{'task': 'end', 'type': 'job_end'}],
+                                      'before_tasks': [{'task': 'start', 'type': 'job_start'}],
+                                      'description': 'add defaults parameters for all '
+                                                     'pipelines'},
+                'service_defaults': {'description': 'add defaults parameters for all '
+                                                    'services'},
+                'task_defaults': {'description': 'add defaults parameters for all tasks '
+                                                 'separate by task type'},
+                'type': 'super'}
+        subliminal = {
+            "name": "subliminal_test",
+            "type": "sub"
+        }
+
+        find_config_files_mock.return_value = {
+            "subliminal_test": subliminal
+        }
+
+        config_util = ConfigUtil("")
+
+        self.assertEqual(base,
+                         config_util._ConfigUtil__get_superliminal(subliminal))
+
+        self.assertEqual({},
+                         config_util._ConfigUtil__get_superliminal(base))
+
+        liminal = {
+            "name": "subliminal_test",
+            "type": "sub",
+            "super": "my_superliminal"
+        }
+
+        with self.assertRaises(FileNotFoundError):
+            config_util._ConfigUtil__get_superliminal(liminal)
+
+    @mock.patch("liminal.core.util.files_util.load")
+    def test_merge_superliminals(self, find_config_files_mock):
+        superliminal = {
+            "name": "my_superliminal_test",
+            "type": "super",
+            "super": "super_superliminal",
+            "pipeline_defaults": {
+                "before_tasks": [
+                    {"task": "start-2", "type": "spark"},
+                ],
+                "after_tasks": [
+                    {"task": "end-1", "type": "spark"}
+                ]
+            },
+            "task_defaults": {
+                "task_def1": "task_def1_value"
+            }
+        }
+
+        super_superliminal = {
+            "name": "super_superliminal",
+            "type": "super",
+            "pipeline_defaults": {
+                "before_tasks": [
+                    {"task": "start-1", "type": "spark"}],
+                "after_tasks": [
+                    {"task": "end-2", "type": "spark"}
+                ]
+            }
+        }
+
+        config_util = ConfigUtil("")
+
+        find_config_files_mock.return_value = {
+            "super_superliminal": super_superliminal,
+            "superliminal": superliminal
+        }
+
+        expected = {'name': 'my_superliminal_test',
+                    'pipeline_defaults': {'after_tasks': [{'task': 'end-1', 'type': 'spark'},
+                                                          {'task': 'end-2', 'type': 'spark'}],
+                                          'before_tasks': [{'task': 'start-1', 'type': 'spark'},
+                                                           {'task': 'start-2', 'type': 'spark'}]},
+                    'super': 'super_superliminal',
+                    'task_defaults': {'task_def1': 'task_def1_value'},
+                    'type': 'super'}
+
+        self.assertEqual(expected,
+                         dict(config_util._ConfigUtil__merge_superliminals(superliminal,
+                                                                           super_superliminal)))
+
+    @mock.patch("liminal.core.util.files_util.load")
+    @mock.patch.dict(os.environ, {'env': 'myenv', 'LIMINAL_STAND_ALONE_MODE': 'True'})
+    def test_safe_load_with_variables(self, find_config_files_mock):
+        subliminal = {
+            "name": "my_subliminal_test",
+            "type": "sub",
+            "super": "my_superliminal_test",
+            "variables": {
+                "var": "simple case",
+                "var-2": "-case",
+                "var_2": "_case",
+                "image": "prod image",
+                "a": "{{env}}1",
+                "b": "{{a}}2",
+                "c": "{{a}}{{b}}2"
+            },
+            "pipelines": [
+                {"name": "mypipe1", "param": "{{var}}",
+                 "tasks": [
+                     {'task': 'sub_tasks',
+                      'type': 'dummy'},
+                 ]},
+                {"name": "mypipe2", "param": "{{var-2   }}", "tasks": [
+                    {'task': 'sub_tasks',
+                     'type': 'dummy'},
+                ]}
+            ],
+            "pipeline_defaults": {
+                "param1": "{{var-2}}"
+            },
+            "task_defaults": {
+                "job_start": {
+                    "task_def1:": "task_sub_def_value"
+                }
+
+            },
+            "services": [
+                {
+                    "name": "my_python_server",
+                    "type": "python_server",
+                    "image": "{{image}}"
+                },
+                {
+                    "name": "my_python_server_for_stg",
+                    "type": "python_server",
+                    "image": "{{default_image}}"
+                }
+            ]}
+
+        superliminal = {
+            "name": "my_superliminal_test",
+            "type": "super",
+            "variables": {
+                "var-2": "override",
+                "var3": "super_var",
+                "default_image": "default_image_value",
+                "image": "default_image_value"
+            },
+            "super": "super_superliminal",
+            "pipeline_defaults": {
+                "param2": "{{pipe-var}}",
+                "param3": "param3super_value",
+                "before_tasks": [
+                    {'task': 'second_task', 'type': 'dummy'},
+                ]
+            },
+            "task_defaults": {
+                "pipeline": {
+                    "path": "{{var-2}}",
+                    "task_def1": "task_def1_value",
+                    "task_def2": {
+                        "task_def2_1": "task_def2_1_value",
+                    }
+                }
+            }
+        }
+        super_superliminal = {
+            "name": "super_superliminal",
+            "type": "super",
+            "variables": {
+                "default_image": "def_default_image_value"
+            },
+            "pipeline_defaults": {
+                "global_conf": "{{var3}}",
+                "param2": "param2super_value",
+                "param3": "param3hyper_value",
+                "param4": "param4hyper_value",
+                "after_tasks": [
+                    {'task': 'before_last_task', 'type': 'dummy'},
+                ]
+            }
+        }
+
+        expected = [{'name': 'my_subliminal_test',
+                     'pipeline_defaults': {'param1': '-case'},
+                     'pipelines': [{'description': 'add defaults parameters for all pipelines',
+                                    'global_conf': 'super_var',
+                                    'name': 'mypipe1',
+                                    'param': 'simple case',
+                                    'param1': '-case',
+                                    'param2': '{{pipe-var}}',
+                                    'param3': 'param3super_value',
+                                    'param4': 'param4hyper_value',
+                                    'tasks': [{'task': 'start',
+                                               'task_def1:': 'task_sub_def_value',
+                                               'type': 'job_start'},
+                                              {'task': 'second_task', 'type': 'dummy'},
+                                              {'task': 'sub_tasks', 'type': 'dummy'},
+                                              {'task': 'before_last_task', 'type': 'dummy'},
+                                              {'task': 'end', 'type': 'job_end'}]},
+                                   {'description': 'add defaults parameters for all pipelines',
+                                    'global_conf': 'super_var',
+                                    'name': 'mypipe2',
+                                    'param': '-case',
+                                    'param1': '-case',
+                                    'param2': '{{pipe-var}}',
+                                    'param3': 'param3super_value',
+                                    'param4': 'param4hyper_value',
+                                    'tasks': [{'task': 'start',
+                                               'task_def1:': 'task_sub_def_value',
+                                               'type': 'job_start'},
+                                              {'task': 'second_task', 'type': 'dummy'},
+                                              {'task': 'sub_tasks', 'type': 'dummy'},
+                                              {'task': 'before_last_task', 'type': 'dummy'},
+                                              {'task': 'end', 'type': 'job_end'}]}],
+                     'service_defaults': {'description': 'add defaults parameters for all '
+                                                         'services'},
+                     'services': [{'description': 'add defaults parameters for all services',
+                                   'image': 'prod image',
+                                   'name': 'my_python_server',
+                                   'type': 'python_server'},
+                                  {'description': 'add defaults parameters for all services',
+                                   'image': 'default_image_value',
+                                   'name': 'my_python_server_for_stg',
+                                   'type': 'python_server'}],
+                     'super': 'my_superliminal_test',
+                     'task_defaults': {'job_start': {'task_def1:': 'task_sub_def_value'}},
+                     'type': 'sub',
+                     'variables': {'a': 'myenv1',
+                                   'b': 'myenv12',
+                                   'c': 'myenv1myenv122',
+                                   'image': 'prod image',
+                                   'var': 'simple case',
+                                   'var-2': '-case',
+                                   'var_2': '_case'}}]
+
+        find_config_files_mock.return_value = {
+            "my_subliminal_test": subliminal,
+            "my_superliminal_test": superliminal,
+            "super_superliminal": super_superliminal
+        }
+
+        config_util = ConfigUtil("")
+
+        self.assertEqual(expected, config_util.safe_load(is_render_variables=True))
+
+        # validate cache
+        self.assertEqual(expected, config_util.loaded_subliminals)
+
+    @mock.patch('os.path.exists')
+    @mock.patch("liminal.core.environment.get_airflow_home_dir")
+    @mock.patch("liminal.core.util.files_util.load")
+    @mock.patch.dict(os.environ, {'LIMINAL_STAND_ALONE_MODE': 'True', 'POD_NAMESPACE': 'my_pod_ns'})
+    def test_liminal_config_snapshot(self, find_config_files_mock,
+                                     get_airflow_dir_mock, path_exists_mock):
+        subliminal = {
+            "name": "my_subliminal_test",
+            "type": "sub",
+            "variables": {
+                "var": 1,
+                "var-2": True
+            },
+            "pipelines": [
+                {"name": "mypipe1", "param": "{{var}}"},
+                {"name": "mypipe2", "param": "{{var-2   }}"}
+            ]
+        }
+
+        expected = {'name': 'my_subliminal_test', 'type': 'sub',
+                    'service_defaults': {'description': 'add defaults parameters for all services'},
+                    'task_defaults': {
+                        'description': 'add defaults parameters for all tasks separate by task type'},
+                    'pipeline_defaults': {
+                        'description': 'add defaults parameters for all pipelines',
+                        'before_tasks': [{'task': 'start', 'type': 'job_start'}],
+                        'after_tasks': [{'task': 'end', 'type': 'job_end'}]},
+                    'variables': {'var': 1, 'var-2': True}, 'pipelines': [
+                {'name': 'mypipe1', 'param': '1',
+                 'description': 'add defaults parameters for all pipelines',
+                 'tasks': [{'task': 'start', 'type': 'job_start'},
+                           {'task': 'end', 'type': 'job_end'}]},
+                {'name': 'mypipe2', 'param': 'True',
+                 'description': 'add defaults parameters for all pipelines',
+                 'tasks': [{'task': 'start', 'type': 'job_start'},
+                           {'task': 'end', 'type': 'job_end'}]}], 'services': []}
+
+        find_config_files_mock.return_value = {
+            "my_subliminal_test": subliminal
+        }
+
+        get_airflow_dir_mock.return_value = "/tmp"
+        path_exists_mock.return_value = True
+
+        with mock.patch("builtins.open", mock.mock_open()) as m:
+            with mock.patch("yaml.dump") as ydm:
+                config_util = ConfigUtil("")
+                config_util.safe_load(is_render_variables=True)
+                config_util.snapshot_final_liminal_configs()
+
+                m.assert_called_once_with(
+                    os.path.join('/tmp', '../liminal_config_files/my_subliminal_test.yml'), 'w')
+                ydm.assert_called_once_with(expected, m.return_value, default_flow_style=False)
diff --git a/tests/runners/airflow/tasks/defaults/__init__.py b/tests/liminal/core/util/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/defaults/__init__.py
copy to tests/liminal/core/util/__init__.py
diff --git a/tests/liminal/core/util/test_dict_utils.py b/tests/liminal/core/util/test_dict_utils.py
new file mode 100644
index 0000000..e5c2b10
--- /dev/null
+++ b/tests/liminal/core/util/test_dict_utils.py
@@ -0,0 +1,236 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+from unittest import TestCase, mock
+
+from liminal.core.util import dict_util
+
+
+class TestDictUtils(TestCase):
+    def setUp(self) -> None:
+        self.dict1 = {
+            "env": "env1",
+            "env_dict":
+                {
+                    "env3": "env3"
+                },
+            "env4": "env4"
+        }
+
+        self.dict2 = {
+            "env": "env1",
+            "env_dict":
+                {
+                    "env2": "env2"
+                }
+        }
+
+    def test_merge_dicts(self):
+        expected = {
+            "env": "env1",
+            "env4": "env4",
+            "env_dict":
+                {
+                    "env2": "env2"
+                }
+        }
+
+        self.assertEqual(expected, dict_util.merge_dicts(self.dict1, self.dict2))
+
+    def test_recursive_merge_dicts(self):
+        expected = {
+            "env": "env1",
+            "env4": "env4",
+            "env_dict": {
+                "env2": "env2",
+                "env3": "env3"
+            }
+        }
+        self.assertEqual(expected, dict_util.merge_dicts(self.dict1, self.dict2, True))
+
+    def test_merge_with_empty(self):
+        self.assertEqual(self.dict2, dict_util.merge_dicts({}, self.dict2, True))
+        self.assertEqual(self.dict2, dict_util.merge_dicts({}, self.dict2))
+
+        self.assertEqual(self.dict2, dict_util.merge_dicts(self.dict2, {}, True))
+        self.assertEqual(self.dict2, dict_util.merge_dicts(self.dict2, {}))
+
+    def test_replace_variables_simple_case(self):
+        dct = {
+            "env": "{{env_var}}",
+            "env_dict":
+                {
+                    "env3": "{{ var1 }}"
+                },
+            "env4": "{{var2 }}",
+            "env5": "{{{var2}}",
+            "env6": "{{var3}}"
+        }
+
+        variables = {
+            "env_var": "env value",
+            "var1": "value1",
+            "var2": "value2"
+        }
+
+        expected = {
+            "env": "env value",
+            "env_dict":
+                {
+                    "env3": "value1"
+                },
+            "env4": "value2",
+            "env5": "{value2",
+            "env6": "{{var3}}"
+        }
+
+        self.assertEqual(expected, dict_util.replace_placeholders(dct, variables))
+
+    def test_replace_variables_empty_var(self):
+        dct = {
+            "env": "{{env_var}}",
+            "env_dict":
+                {
+                    "env3": "{{ var1 }}"
+                },
+            "env4": "{{var2 }}",
+            "env5": "{{{var2}}",
+            "env6": "{{var3}}"
+        }
+        self.assertEqual(dct, dict_util.replace_placeholders(dct, {}))
+
+    @mock.patch.dict(os.environ, {"LIMINAL_STAND_ALONE_MODE": "False"})
+    @mock.patch('airflow.models.Variable.get')
+    def test_replace_variables_flat_replace(self, airflow_variable_mock):
+        def airflow_variable_values(key, default_var):
+            return 'liminal playground' if key == 'playground' else default_var
+
+        airflow_variable_mock.side_effect = airflow_variable_values
+
+        dct = {
+            "query": "select * from my_table "
+                     "where event_type = {{event_type}} and region = {{region}}",
+            "env": "{{prod}}, {{stg}}, {{playground}}",
+            "optional": "{{optionals}}"
+        }
+
+        variables = {
+            "region": "us_east_1",
+            "event_type": "subscription",
+            "prod": "liminal production",
+            "stg": "liminal staging"
+        }
+
+        expected = {
+            'query': 'select * from my_table '
+                     'where event_type = subscription and region = us_east_1',
+            "env": "liminal production, liminal staging, liminal playground",
+            "optional": "{{optionals}}"}
+
+        self.assertEqual(expected, dict_util.replace_placeholders(dct, variables))
+
+    @mock.patch.dict(os.environ, {"LIMINAL_STAND_ALONE_MODE": "False"})
+    @mock.patch('airflow.models.Variable.get')
+    def test_replace_variables_with_nested_list(self, airflow_variable_mock):
+        def airflow_variable_values(key, default_var):
+            return 'liminal playground' if key == 'playground' else default_var
+
+        airflow_variable_mock.side_effect = airflow_variable_values
+
+        dct = {
+            "query": "select * from my_table "
+                     "where event_type = {{event_type}} and region = {{region}}",
+            "env": ['{{prod}}', '{{stg}}', '{{playground}}'],
+            "tasks": [
+                {
+                    'id': 'id1',
+                    'image': '{{image}}'
+                }
+            ],
+            "optional": "{{optionals}}"
+        }
+
+        variables = {
+            "region": "us_east_1",
+            "event_type": "subscription",
+            "prod": "liminal production",
+            "stg": "liminal staging",
+            "image": "my_image_name"
+        }
+
+        expected = {'env': ['liminal production', 'liminal staging', 'liminal playground'],
+                    'optional': '{{optionals}}',
+                    'query': 'select * from my_table where event_type = subscription and region = '
+                             'us_east_1',
+                    'tasks': [{'id': 'id1', 'image': 'my_image_name'}]}
+
+        self.assertEqual(expected, dict_util.replace_placeholders(dct, variables))
+
+    @mock.patch.dict(os.environ, {"table": "my_table", "LIMINAL_STAND_ALONE_MODE": "True"})
+    def test_replace_variables_from_env(self):
+        dct = {
+            "query": "select * from my_table "
+                     "where event_type = {{event_type}} and region = {{region}} from {{table}}"
+        }
+
+        variables = {}
+
+        expected = {'query': 'select * from my_table '
+                             'where event_type = {{event_type}} '
+                             'and region = {{region}} from my_table'}
+
+        self.assertEqual(expected, dict_util.replace_placeholders(dct, variables))
+
+    @mock.patch.dict(os.environ, {"table": "my_table", "LIMINAL_STAND_ALONE_MODE": "True"})
+    def test_replace_variables_from_variable_and_not_env(self):
+        dct = {
+            "query": "select * from my_table "
+                     "where event_type = {{event_type}} and region = {{region}} from {{table}}"
+        }
+
+        variables = {
+            "table": "my_variable_table"
+        }
+
+        expected = {'query': 'select * from my_table '
+                             'where event_type = {{event_type}} '
+                             'and region = {{region}} from my_variable_table'}
+
+        self.assertEqual(expected, dict_util.replace_placeholders(dct, variables))
+
+    @mock.patch.dict(os.environ, {"table": "my_table", "LIMINAL_STAND_ALONE_MODE": "False"})
+    @mock.patch('airflow.models.Variable.get')
+    def test_replace_variables_from_airflow_and_not_enc(self, airflow_variable_mock):
+        def airflow_variable_values(key, default_var):
+            return 'my_airflow_table' if key == 'table' else default_var
+
+        airflow_variable_mock.side_effect = airflow_variable_values
+
+        dct = {
+            "query": "select * from my_table "
+                     "where event_type = {{event_type}} and region = {{region}} from {{table}}"
+        }
+
+        variables = {}
+
+        expected = {'query': 'select * from my_table '
+                             'where event_type = {{event_type}} '
+                             'and region = {{region}} from my_airflow_table'}
+
+        self.assertEqual(expected, dict_util.replace_placeholders(dct, variables))
diff --git a/tests/runners/airflow/build/http/python/test_python_server_image_builder.py b/tests/runners/airflow/build/http/python/test_python_server_image_builder.py
index 28d048d..88f9663 100644
--- a/tests/runners/airflow/build/http/python/test_python_server_image_builder.py
+++ b/tests/runners/airflow/build/http/python/test_python_server_image_builder.py
@@ -54,9 +54,9 @@
     def test_build_python_server_with_pip_conf(self):
         build_out = self.__test_build_python_server(use_pip_conf=True)
 
-        # self.assertTrue(
-        #     'RUN --mount=type=secret,id=pip_config,dst=/etc/pip.conf  pip install' in build_out,
-        #     'Incorrect pip command')
+        self.assertTrue(
+            'RUN --mount=type=secret,id=pip_config,dst=/etc/pip.conf  pip install' in build_out,
+            'Incorrect pip command')
 
     def __test_build_python_server(self, use_pip_conf=False,
                                    python_version=None):
diff --git a/tests/runners/airflow/build/python/test_python_image_builder.py b/tests/runners/airflow/build/python/test_python_image_builder.py
index 305d7f9..1046a25 100644
--- a/tests/runners/airflow/build/python/test_python_image_builder.py
+++ b/tests/runners/airflow/build/python/test_python_image_builder.py
@@ -54,9 +54,9 @@
     def test_build_with_pip_conf(self):
         build_out = self.__test_build(use_pip_conf=True)
 
-        # self.assertTrue(
-        #     'RUN --mount=type=secret,id=pip_config,dst=/etc/pip.conf  pip install' in build_out,
-        #     'Incorrect pip command')
+        self.assertTrue(
+            'RUN --mount=type=secret,id=pip_config,dst=/etc/pip.conf  pip install' in build_out,
+            'Incorrect pip command')
 
         self.__test_image()
 
diff --git a/tests/runners/airflow/dag/test_liminal_dags.py b/tests/runners/airflow/dag/test_liminal_dags.py
index c12bb3b..a129767 100644
--- a/tests/runners/airflow/dag/test_liminal_dags.py
+++ b/tests/runners/airflow/dag/test_liminal_dags.py
@@ -18,8 +18,9 @@
 
 import os
 import unittest
-from unittest import TestCase
+from unittest import TestCase, mock
 
+from liminal.core.config.config import ConfigUtil
 from liminal.runners.airflow.dag import liminal_dags
 from liminal.runners.airflow.operators.job_status_operator import JobEndOperator, JobStartOperator
 
@@ -59,8 +60,10 @@
         self.assertIn('default_object_loaded', keys)
 
     @staticmethod
-    def get_register_dags():
-        base_path = os.path.join(os.path.dirname(__file__), '../liminal')
+    @mock.patch.object(ConfigUtil, "snapshot_final_liminal_configs")
+    def get_register_dags(mock_snapshot_final_liminal_configs):
+        mock_snapshot_final_liminal_configs.side_effect = None
+        base_path = os.path.join(os.path.dirname(__file__), '../../apps/test_app')
         return liminal_dags.register_dags(base_path)
 
 
diff --git a/tests/runners/airflow/tasks/defaults/test_job_end.py b/tests/runners/airflow/tasks/test_job_end.py
similarity index 67%
rename from tests/runners/airflow/tasks/defaults/test_job_end.py
rename to tests/runners/airflow/tasks/test_job_end.py
index 1a6e195..d3e1444 100644
--- a/tests/runners/airflow/tasks/defaults/test_job_end.py
+++ b/tests/runners/airflow/tasks/test_job_end.py
@@ -19,7 +19,7 @@
 import unittest
 from unittest import TestCase
 
-from liminal.runners.airflow.tasks.defaults import job_end
+from liminal.runners.airflow.tasks import job_end
 from tests.util import dag_test_utils
 
 
@@ -27,16 +27,16 @@
 class TestJobEndTask(TestCase):
 
     def test_apply_task_to_dag(self):
-
         dag = dag_test_utils.create_dag()
 
         task0 = job_end.JobEndTask(
-            dag,
-            {'metrics': {'namespace': 'EndJobNameSpace', 'backends': ['cloudwatch']}},
-            {'pipeline': 'my_end_pipeline'},
-            {},
-            None,
-            'all_done'
+            task_id='job_end',
+            dag=dag,
+            pipeline_config={'pipeline': 'my_end_pipeline'},
+            task_config={},
+            parent=None,
+            trigger_rule='all_done',
+            liminal_config={'metrics': {'namespace': 'EndJobNameSpace', 'backends': ['cloudwatch']}}
         )
         task0.apply_task_to_dag()
 
@@ -52,7 +52,9 @@
         conf = {'pipeline': 'my_pipeline'}
         dag = dag_test_utils.create_dag()
 
-        task0 = job_end.JobEndTask(dag, {}, {'pipeline': 'my_end_pipeline'}, conf, None, 'all_done')
+        task0 = job_end.JobEndTask(task_id="job_end", dag=dag,
+                                   pipeline_config={'pipeline': 'my_end_pipeline'}, liminal_config=conf, parent=None,
+                                   trigger_rule='all_done', task_config={})
         task0.apply_task_to_dag()
 
         self.assertEqual(len(dag.tasks), 1)
@@ -65,12 +67,13 @@
     def test_apply_task_to_dag_with_partial_configuration(self):
         dag = dag_test_utils.create_dag()
 
-        task0 = job_end.JobEndTask(dag,
-                                   {'metrics': {'namespace': 'EndJobNameSpace'}},
-                                   {'pipeline': 'my_end_pipeline'},
-                                   {},
-                                   None,
-                                   'all_done')
+        task0 = job_end.JobEndTask(task_id="job_enc",
+                                   dag=dag,
+                                   liminal_config={'metrics': {'namespace': 'EndJobNameSpace'}},
+                                   pipeline_config={'pipeline': 'my_end_pipeline'},
+                                   task_config={},
+                                   parent=None,
+                                   trigger_rule='all_done')
         task0.apply_task_to_dag()
 
         self.assertEqual(len(dag.tasks), 1)
diff --git a/tests/runners/airflow/tasks/defaults/test_job_start.py b/tests/runners/airflow/tasks/test_job_start.py
similarity index 65%
rename from tests/runners/airflow/tasks/defaults/test_job_start.py
rename to tests/runners/airflow/tasks/test_job_start.py
index 78ed3ec..b799544 100644
--- a/tests/runners/airflow/tasks/defaults/test_job_start.py
+++ b/tests/runners/airflow/tasks/test_job_start.py
@@ -19,7 +19,7 @@
 import unittest
 from unittest import TestCase
 
-from liminal.runners.airflow.tasks.defaults import job_start
+from liminal.runners.airflow.tasks import job_start
 from tests.util import dag_test_utils
 
 
@@ -30,12 +30,13 @@
         dag = dag_test_utils.create_dag()
 
         task0 = job_start.JobStartTask(
-            dag,
-            {'metrics': {'namespace': 'StartJobNameSpace', 'backends': ['cloudwatch']}},
-            {'pipeline': 'my_start_pipeline'},
-            {},
-            None,
-            'all_success'
+            task_id="start_task",
+            dag=dag,
+            liminal_config={'metrics': {'namespace': 'StartJobNameSpace', 'backends': ['cloudwatch']}},
+            pipeline_config={'pipeline': 'my_start_pipeline'},
+            task_config={},
+            parent=None,
+            trigger_rule='all_success'
         )
         task0.apply_task_to_dag()
 
@@ -52,12 +53,14 @@
 
         dag = dag_test_utils.create_dag()
 
-        task0 = job_start.JobStartTask(dag,
-                                       {},
-                                       {'pipeline': 'my_end_pipeline'},
-                                       conf,
-                                       None,
-                                       'all_success')
+        task0 = job_start.JobStartTask(
+            task_id="start_task",
+            dag=dag,
+            liminal_config=conf,
+            task_config={},
+            pipeline_config={'pipeline': 'my_end_pipeline'},
+            parent=None,
+            trigger_rule='all_success')
         task0.apply_task_to_dag()
 
         self.assertEqual(len(dag.tasks), 1)
@@ -70,12 +73,13 @@
     def test_apply_task_to_dag_with_partial_configuration(self):
         dag = dag_test_utils.create_dag()
 
-        task0 = job_start.JobStartTask(dag,
-                                       {'metrics': {'namespace': 'StartJobNameSpace'}},
-                                       {'pipeline': 'my_start_pipeline'},
-                                       {},
-                                       None,
-                                       'all_success')
+        task0 = job_start.JobStartTask(task_id="start_task",
+                                       dag=dag,
+                                       liminal_config={'metrics': {'namespace': 'StartJobNameSpace'}},
+                                       pipeline_config={'pipeline': 'my_start_pipeline'},
+                                       task_config={},
+                                       parent=None,
+                                       trigger_rule='all_success', )
         task0.apply_task_to_dag()
 
         self.assertEqual(len(dag.tasks), 1)
diff --git a/tests/runners/airflow/tasks/test_python.py b/tests/runners/airflow/tasks/test_python.py
index 051fc21..00920ee 100644
--- a/tests/runners/airflow/tasks/test_python.py
+++ b/tests/runners/airflow/tasks/test_python.py
@@ -135,14 +135,27 @@
         if executors:
             task_config['executors'] = executors
 
-        return python.PythonTask(dag=dag,
-                                 liminal_config=self.liminal_config,
-                                 pipeline_config={
-                                     'pipeline': 'my_pipeline'
-                                 },
-                                 task_config=task_config,
-                                 parent=parent,
-                                 trigger_rule='all_success')
+        return python.PythonTask(
+            task_id=task_id,
+            dag=dag,
+            liminal_config={
+                'volumes': [
+                    {
+                        'volume': self._VOLUME_NAME,
+                        'local': {
+                            'path': self.temp_dir.replace(
+                                "/var/folders",
+                                "/private/var/folders"
+                            )
+                        }
+                    }
+                ]},
+            pipeline_config={
+                'pipeline': 'my_pipeline'
+            },
+            task_config=task_config,
+            parent=parent,
+            trigger_rule='all_success')
 
 
 if __name__ == '__main__':
diff --git a/tests/runners/airflow/tasks/defaults/__init__.py b/tests/runners/apps/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/defaults/__init__.py
copy to tests/runners/apps/__init__.py
diff --git a/tests/runners/airflow/tasks/defaults/__init__.py b/tests/runners/apps/test_app/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/defaults/__init__.py
copy to tests/runners/apps/test_app/__init__.py
diff --git a/tests/runners/airflow/tasks/defaults/__init__.py b/tests/runners/apps/test_app/defaults/__init__.py
similarity index 100%
rename from tests/runners/airflow/tasks/defaults/__init__.py
rename to tests/runners/apps/test_app/defaults/__init__.py
diff --git a/tests/runners/apps/test_app/defaults/liminal.yml b/tests/runners/apps/test_app/defaults/liminal.yml
new file mode 100644
index 0000000..ef4c3c5
--- /dev/null
+++ b/tests/runners/apps/test_app/defaults/liminal.yml
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: default_liminal
+type: super
+service_defaults:
+  one: two
+task_defaults:
+  env_vars:
+    env1: a
+pipeline_defaults:
+  one: one
+  two: twoww
\ No newline at end of file
diff --git a/tests/runners/airflow/tasks/defaults/__init__.py b/tests/runners/apps/test_app/extra/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/defaults/__init__.py
copy to tests/runners/apps/test_app/extra/__init__.py
diff --git a/tests/runners/apps/test_app/extra/liminal.yml b/tests/runners/apps/test_app/extra/liminal.yml
new file mode 100644
index 0000000..daef21e
--- /dev/null
+++ b/tests/runners/apps/test_app/extra/liminal.yml
@@ -0,0 +1,53 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: extra_liminal
+type: super
+super: default_liminal
+service_defaults:
+  endpoints:
+    - endpoint: /myendpoint1
+      module: myserver.my_server
+      function: myendpoint1func
+images:
+  - image: my_static_input_task_image
+    type: python
+    source: helloworld
+  - image: my_task_output_input_task_image
+    type: python
+    source: helloworld
+pipeline_defaults:
+  tasks:
+    - task: my_static_input_task
+      type: python
+      description: static input task
+      image: my_static_input_task_image
+      env_vars:
+        env2: "b"
+      cmd: python -u hello_world.py
+    - task: extenable
+      type: pipeline
+      description: optional sub tasks
+    - task: my_task_output_input_task
+      type: python
+      description: task with input from other task's output
+      image: my_task_output_input_task_image
+      env_vars:
+        env1: "c"
+        env2: "b"
+      cmd: python -u hello_world.py
diff --git a/tests/runners/apps/test_app/helloworld/__init__.py b/tests/runners/apps/test_app/helloworld/__init__.py
new file mode 100644
index 0000000..82df24d
--- /dev/null
+++ b/tests/runners/apps/test_app/helloworld/__init__.py
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+print('Hello world!\n')
diff --git a/tests/runners/apps/test_app/helloworld/hellp_world.py b/tests/runners/apps/test_app/helloworld/hellp_world.py
new file mode 100644
index 0000000..e5c70d8
--- /dev/null
+++ b/tests/runners/apps/test_app/helloworld/hellp_world.py
@@ -0,0 +1,38 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import os
+
+inputs_dir = '/mnt/vol1/inputs/'
+
+num_files = int(os.environ['NUM_FILES'])
+num_splits = int(os.environ['NUM_SPLITS'])
+
+# create input files - split by round robin
+for i in range(0, num_files):
+    split_id = i % num_splits
+    split_dir = os.path.join(inputs_dir, str(split_id))
+
+    if not os.path.exists(split_dir):
+        os.makedirs(split_dir)
+
+    filename = os.path.join(split_dir, f'input{i}.json')
+    with open(filename, 'w') as f:
+        print(f'Writing input file {filename}')
+        f.write(json.dumps({'mykey': f'myval{i}'}))
diff --git a/tests/runners/apps/test_app/liminal.yml b/tests/runners/apps/test_app/liminal.yml
new file mode 100644
index 0000000..9c8e52c
--- /dev/null
+++ b/tests/runners/apps/test_app/liminal.yml
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyPipeline
+super: extra_liminal
+images:
+  - image: my_static_input_task_image
+    type: python
+    source: helloworld
+  - image: my_server_image
+    type: python
+    source: myserver
+pipelines:
+  - pipeline: my_pipeline
+    owner: Bosco Albert Baracus
+    start_date: 1970-01-01
+    timeout_minutes: 45
+    schedule: 0 * 1 * *
+    default_arg_loaded: check
+    default_array_loaded: [ 2, 3, 4 ]
+    default_object_loaded:
+      key1: val1
+      key2: val2
+    tasks:
+      - task: my_parallelized_static_input_task
+        type: python
+        description: parallelized static input task
+        image: my_static_input_task_image
+        executors: 2
+        cmd: python -u hello_world.py
+services:
+  - service:
+    name: my_python_server
+    type: python_server
+    description: my python server
+    image: my_server_image
diff --git a/tests/runners/airflow/tasks/defaults/__init__.py b/tests/runners/apps/test_app/my_server/__init__.py
similarity index 100%
copy from tests/runners/airflow/tasks/defaults/__init__.py
copy to tests/runners/apps/test_app/my_server/__init__.py
diff --git a/tests/runners/apps/test_app/my_server/my_server.py b/tests/runners/apps/test_app/my_server/my_server.py
new file mode 100644
index 0000000..265d3be
--- /dev/null
+++ b/tests/runners/apps/test_app/my_server/my_server.py
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import json
+
+
+def myendpoint1func(input_json):
+    input_dict = json.loads(input_json) if input_json else {}
+    return f'Input was: {input_dict}'
diff --git a/tests/util/test_class_utils.py b/tests/util/test_class_utils.py
index 82df0a0..bdfacfa 100644
--- a/tests/util/test_class_utils.py
+++ b/tests/util/test_class_utils.py
@@ -19,32 +19,24 @@
 from unittest import TestCase
 
 from liminal.core.util import class_util
-from tests.util.test_pkg_1.test_clazz_base import A, Z
+from tests.util.test_pkg_1.test_clazz_base import A
 from tests.util.test_pkg_1.test_pkg_1_1.test_clazz_child_1 import B
 from tests.util.test_pkg_1.test_pkg_1_1.test_clazz_child_2 import C
-from tests.util.test_pkg_1.test_pkg_1_1.test_pkg_1_1_1.test_clazz_leaf_1 import F, D, E
-from tests.util.test_pkg_1.test_pkg_1_1.test_pkg_1_1_2.test_clazz_leaf_2 import G, H
+from tests.util.test_pkg_1.test_pkg_1_1.test_pkg_1_1_1.test_clazz_leaf_1 import D
+from tests.util.test_pkg_1.test_pkg_1_1.test_pkg_1_1_2.test_clazz_leaf_2 import E
 
 
 class Test(TestCase):
     def test_find_full_hierarchy_from_root(self):
-        expected_set = set([B, C, D, E, H, Z])
+        expected_set = {
+            'test_clazz_child_1': B,
+            'test_clazz_child_2': C,
+            'test_clazz_leaf_1': D,
+            'test_clazz_leaf_2': E
+        }
         self.hierarchy_check(A, expected_set)
 
-    def test_find_full_hierarchy_mid_tree_in_package(self):
-        expected_set = set([G])
-        self.hierarchy_check(F, expected_set)
-
-    def test_leaf_class(self):
-        expected_set = set()
-        self.hierarchy_check(G, expected_set)
-
-    def hierarchy_check(self, clazz, expected_set):
+    def hierarchy_check(self, clazz, expected_dict):
         pkg_root = 'tests.util.test_pkg_1'
-        full_tree = class_util.find_subclasses_in_packages(
-            [pkg_root],
-            clazz)
-
-        res_set = set()
-        res_set.update(full_tree.values())
-        self.assertEqual(res_set, expected_set)
+        result_dict = class_util.find_subclasses_in_packages([pkg_root], clazz)
+        self.assertDictEqual(result_dict, expected_dict)
diff --git a/tests/util/test_pkg_1/test_clazz_base.py b/tests/util/test_pkg_1/test_clazz_base.py
index 3e7c523..a81771f 100644
--- a/tests/util/test_pkg_1/test_clazz_base.py
+++ b/tests/util/test_pkg_1/test_clazz_base.py
@@ -19,7 +19,3 @@
 
 class A:
     pass
-
-
-class Z(A):
-    pass
diff --git a/tests/util/test_pkg_1/test_pkg_1_1/test_pkg_1_1_1/test_clazz_leaf_1.py b/tests/util/test_pkg_1/test_pkg_1_1/test_pkg_1_1_1/test_clazz_leaf_1.py
index 2aba50e..77a6ebc 100644
--- a/tests/util/test_pkg_1/test_pkg_1_1/test_pkg_1_1_1/test_clazz_leaf_1.py
+++ b/tests/util/test_pkg_1/test_pkg_1_1/test_pkg_1_1_1/test_clazz_leaf_1.py
@@ -18,16 +18,7 @@
 
 
 from tests.util.test_pkg_1.test_pkg_1_1.test_clazz_child_1 import B
-from tests.util.test_pkg_1.test_pkg_1_1.test_clazz_child_2 import C
 
 
 class D(B):
     pass
-
-
-class E(C):
-    pass
-
-
-class F:
-    pass
diff --git a/tests/util/test_pkg_1/test_pkg_1_1/test_pkg_1_1_2/test_clazz_leaf_2.py b/tests/util/test_pkg_1/test_pkg_1_1/test_pkg_1_1_2/test_clazz_leaf_2.py
index 3f96496..dcee083 100644
--- a/tests/util/test_pkg_1/test_pkg_1_1/test_pkg_1_1_2/test_clazz_leaf_2.py
+++ b/tests/util/test_pkg_1/test_pkg_1_1/test_pkg_1_1_2/test_clazz_leaf_2.py
@@ -15,12 +15,8 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-from tests.util.test_pkg_1.test_pkg_1_1.test_pkg_1_1_1.test_clazz_leaf_1 import F, E
+from tests.util.test_pkg_1.test_pkg_1_1.test_pkg_1_1_1.test_clazz_leaf_1 import D
 
 
-class G(F):
-    pass
-
-class H(E):
+class E(D):
     pass