blob: fa0a83b7ec70cb7021831d61fc430e9dab09e3f2 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import os
import traceback
from datetime import datetime, timedelta
from airflow import DAG
from liminal.core.config.config import ConfigUtil
from liminal.core.util import extensible
from liminal.runners.airflow.executors import airflow
__DEPENDS_ON_PAST = 'depends_on_past'
# noinspection PyBroadException
def register_dags(configs_path):
"""
Registers pipelines in liminal yml files found in given path (recursively) as airflow DAGs.
"""
logging.info(f'Registering DAGs from path: {configs_path}')
logging.info(f'===========================================')
config_util = ConfigUtil(configs_path)
configs = config_util.safe_load(is_render_variables=False)
if os.getenv('POD_NAMESPACE') != "jenkins":
config_util.snapshot_final_liminal_configs()
dags = []
logging.info(f'found {len(configs)} liminal configs in path: {configs_path}')
for config in configs:
name = config['name'] if 'name' in config else None
try:
if not name:
raise ValueError('liminal.yml missing field `name`')
logging.info(f"Registering DAGs for {name}")
owner = config.get('owner')
trigger_rule = 'all_success'
if 'always_run' in config and config['always_run']:
trigger_rule = 'all_done'
executors = __initialize_executors(config)
default_executor = airflow.AirflowExecutor("default_executor", liminal_config=config, executor_config={})
for pipeline in config['pipelines']:
default_args = __default_args(pipeline)
dag = __initialize_dag(default_args, pipeline, owner)
parent = None
for task in pipeline['tasks']:
task_type = task['type']
task_instance = get_task_class(task_type)(
task_id=task['task'],
dag=dag,
parent=parent,
trigger_rule=trigger_rule,
liminal_config=config,
pipeline_config=pipeline,
task_config=task,
variables=config.get('variables', {}),
)
executor_id = task.get('executor')
if executor_id:
executor = executors[executor_id]
else:
logging.info(
f"Did not find `executor` in ${task['task']} config."
f" Using the default executor (${type(default_executor)})"
f" instead."
)
executor = default_executor
parent = executor.apply_task_to_dag(task=task_instance)
logging.info(f'registered DAG {dag.dag_id}: {dag.tasks}')
dags.append((pipeline['pipeline'], dag))
except Exception:
logging.error(f'Failed to register DAGs for {name}')
traceback.print_exc()
return dags
def __initialize_executors(liminal_config):
executors = {}
for executor_config in liminal_config.get('executors', {}):
executors[executor_config['executor']] = get_executor_class(executor_config['type'])(
executor_config['executor'], liminal_config, executor_config
)
return executors
def __initialize_dag(default_args, pipeline, owner):
pipeline_name = pipeline['pipeline']
schedule_interval = default_args.get('schedule_interval', None)
if not schedule_interval:
schedule_interval = default_args.get('schedule', None)
if owner and 'owner' not in default_args:
default_args['owner'] = owner
start_date = pipeline.get('start_date', datetime.min.time())
if not isinstance(start_date, datetime):
start_date = datetime.combine(start_date, datetime.min.time())
default_args.pop('tasks', None)
default_args.pop('schedule', None)
default_args.pop('monitoring', None)
default_args.pop('schedule_interval', None)
dag = DAG(
dag_id=pipeline_name,
default_args=default_args,
dagrun_timeout=timedelta(minutes=pipeline['timeout_minutes']),
start_date=start_date,
schedule_interval=schedule_interval,
catchup=False,
)
return dag
def __default_args(pipeline):
default_args = {k: v for k, v in pipeline.items()}
override_args = {
'start_date': datetime.combine(pipeline['start_date'], datetime.min.time()),
__DEPENDS_ON_PAST: default_args[__DEPENDS_ON_PAST] if __DEPENDS_ON_PAST in default_args else False,
}
default_args.update(override_args)
return default_args
logging.info(f'Loading task implementations..')
tasks_by_liminal_name = extensible.load_tasks()
logging.info(f'Finished loading task implementations: {tasks_by_liminal_name.keys()}')
executors_by_liminal_name = extensible.load_executors()
logging.info(f'Finished loading executor implementations: {executors_by_liminal_name.keys()}')
def get_task_class(task_type):
return tasks_by_liminal_name[task_type]
def get_executor_class(executor_type):
return executors_by_liminal_name[executor_type]