blob: d7c57f16c2e72ef5b6da14a21f7a3ae555960a97 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Parse YAML file to create workflow."""
from __future__ import annotations
import logging
import os
import re
from pathlib import Path
from typing import Any
from pydolphinscheduler import configuration, tasks
from pydolphinscheduler.constants import Symbol
from pydolphinscheduler.core.parameter import ParameterType
from pydolphinscheduler.core.task import Task
from pydolphinscheduler.core.workflow import Workflow
from pydolphinscheduler.exceptions import PyDSTaskNoFoundException
from pydolphinscheduler.utils.yaml_parser import YamlParser
logger = logging.getLogger(__file__)
KEY_WORKFLOW = "workflow"
KEY_TASK = "tasks"
KEY_TASK_TYPE = "task_type"
KEY_DEPS = "deps"
KEY_OP = "op"
TASK_SPECIAL_KEYS = [KEY_TASK_TYPE, KEY_DEPS]
class ParseTool:
"""Enhanced parsing tools."""
@staticmethod
def parse_string_param_if_file(string_param: str, **kwargs):
"""Use $FILE{"data_path"} to load file from "data_path"."""
if string_param.startswith("$FILE"):
path = re.findall(r"\$FILE\{\"(.*?)\"\}", string_param)[0]
base_folder = kwargs.get("base_folder", ".")
path = ParseTool.get_possible_path(path, base_folder)
with open(path) as read_file:
string_param = "".join(read_file)
return string_param
@staticmethod
def parse_string_param_if_env(string_param: str, **kwargs):
"""Use $ENV{env_name} to load environment variable "env_name"."""
if "$ENV" in string_param:
key = re.findall(r"\$ENV\{(.*?)\}", string_param)[0]
env_value = os.environ.get(key, f"${key}")
string_param = string_param.replace(f"$ENV{{{key}}}", env_value)
return string_param
@staticmethod
def parse_string_param_if_config(string_param: str, **kwargs):
"""Use ${CONFIG.var_name} to load variable "var_name" from configuration."""
if "${CONFIG" in string_param:
key = re.findall(r"\$\{CONFIG\.(.*?)\}", string_param)[0]
if hasattr(configuration, key):
string_param = getattr(configuration, key)
else:
string_param = configuration.get_single_config(key)
return string_param
@staticmethod
def parse_string_param_if_parameter(string_param: str, **kwargs):
"""Use TYPE(value) to set local params."""
key_path = kwargs.get("key_path")
if key_path.split(Symbol.POINT)[0] not in {"input_params", "output_params"}:
return string_param
if not isinstance(string_param, str):
return string_param
result = re.findall(r"^(.*?)\((.*?)\)", string_param)
if len(result) == 1 and len(result[0]) == 2:
type_ = result[0][0].rstrip()
value = result[0][1].rstrip()
return ParameterType.type_sets[type_](value)
else:
return string_param
@staticmethod
def get_possible_path(file_path, base_folder):
"""Get file possible path.
Return new path if file_path is not exists, but base_folder + file_path exists
"""
possible_path = file_path
if not Path(file_path).exists():
new_path = Path(base_folder).joinpath(file_path)
if new_path.exists():
possible_path = new_path
logger.info(f"{file_path} not exists, convert to {possible_path}")
return possible_path
def get_task_cls(task_type) -> Task:
"""Get the task class object by task_type (case compatible)."""
# only get task class from tasks.__all__
all_task_types = {type_.capitalize(): type_ for type_ in tasks.__all__}
task_type_cap = task_type.capitalize()
if task_type_cap not in all_task_types:
raise PyDSTaskNoFoundException(f"cant not find task {task_type}")
standard_name = all_task_types[task_type_cap]
return getattr(tasks, standard_name)
class YamlWorkflow(YamlParser):
"""Yaml parser for create workflow.
:param yaml_file: yaml file path.
examples1 ::
parser = YamlParser(yaml_file=...)
parser.create_workflow()
examples2 ::
YamlParser(yaml_file=...).create_workflow()
"""
_parse_rules = [
ParseTool.parse_string_param_if_file,
ParseTool.parse_string_param_if_env,
ParseTool.parse_string_param_if_config,
ParseTool.parse_string_param_if_parameter,
]
def __init__(self, yaml_file: str):
with open(yaml_file) as f:
content = f.read()
self._base_folder = Path(yaml_file).parent
content = self.prepare_refer_workflow(content)
super().__init__(content)
def create_workflow(self):
"""Create workflow main function."""
# get workflow parameters with key "workflow"
workflow_params = self[KEY_WORKFLOW]
# pop "run" parameter, used at the end
is_run = workflow_params.pop("run", False)
# use YamlWorkflow._parse_rules to parse special value of yaml file
workflow_params = self.parse_params(workflow_params)
workflow_name = workflow_params["name"]
logger.info(f"Create workflow: {workflow_name}")
with Workflow(**workflow_params) as workflow:
# save dependencies between tasks
dependencies = {}
# save name and task mapping
name2task = {}
# get task datas with key "tasks"
for task_data in self[KEY_TASK]:
task = self.parse_task(task_data, name2task)
deps = task_data.get(KEY_DEPS, [])
if deps:
dependencies[task.name] = deps
name2task[task.name] = task
# build dependencies between task
for downstream_task_name, deps in dependencies.items():
downstream_task = name2task[downstream_task_name]
for upstream_task_name in deps:
upstream_task = name2task[upstream_task_name]
upstream_task >> downstream_task
workflow.submit()
# if set is_run, run the workflow after submit
if is_run:
logger.info(f"run workflow: {workflow}")
workflow.run()
return workflow_name
def parse_params(self, params: Any, key_path=""):
"""Recursively resolves the parameter values.
The function operates params only when it encounters a string; other types continue recursively.
"""
if isinstance(params, str):
for parse_rule in self._parse_rules:
params_ = params
params = parse_rule(
params, base_folder=self._base_folder, key_path=key_path
)
if params_ != params:
logger.info(f"parse {params_} -> {params}")
elif isinstance(params, list):
for index in range(len(params)):
params[index] = self.parse_params(params[index], key_path)
elif isinstance(params, dict):
for key, value in params.items():
if not key_path:
new_key_path = key
else:
new_key_path = key_path + Symbol.POINT + key
params[key] = self.parse_params(value, new_key_path)
return params
@classmethod
def parse(cls, yaml_file: str):
"""Recursively resolves the parameter values.
The function operates params only when it encounters a string; other types continue recursively.
"""
workflow_name = cls(yaml_file).create_workflow()
return workflow_name
def prepare_refer_workflow(self, content):
"""Allow YAML files to reference workflow derived from other YAML files."""
workflow_paths = re.findall(r"\$WORKFLOW\{\"(.*?)\"\}", content)
for workflow_path in workflow_paths:
logger.info(
f"find special token {workflow_path}, load workflow form {workflow_path}"
)
possible_path = ParseTool.get_possible_path(
workflow_path, self._base_folder
)
workflow_name = YamlWorkflow.parse(possible_path)
content = content.replace(f'$WORKFLOW{{"{workflow_path}"}}', workflow_name)
return content
def parse_task(self, task_data: dict, name2task: dict[str, Task]):
"""Parse various types of tasks.
:param task_data: dict.
{
"task_type": "Shell",
"params": {"name": "shell_task", "command":"ehco hellp"}
}
:param name2task: Dict[str, Task]), mapping of task_name and task
Some task type have special parse func:
if task type is Switch, use parse_switch;
if task type is Condition, use parse_condition;
if task type is Dependent, use parse_dependent;
other, we pass all task_params as input to task class, like "task_cls(**task_params)".
"""
task_type = task_data["task_type"]
# get params without special key
task_params = {k: v for k, v in task_data.items() if k not in TASK_SPECIAL_KEYS}
task_cls = get_task_cls(task_type)
# use YamlWorkflow._parse_rules to parse special value of yaml file
task_params = self.parse_params(task_params)
if task_cls == tasks.Switch:
task = self.parse_switch(task_params, name2task)
elif task_cls == tasks.Condition:
task = self.parse_condition(task_params, name2task)
elif task_cls == tasks.Dependent:
task = self.parse_dependent(task_params, name2task)
else:
task = task_cls(**task_params)
logger.info(task_type, task)
return task
def parse_switch(self, task_params, name2task):
"""Parse Switch Task.
This is an example Yaml fragment of task_params
name: switch
condition:
- ["${var} > 1", switch_child_1]
- switch_child_2
"""
from pydolphinscheduler.tasks.switch import (
Branch,
Default,
Switch,
SwitchCondition,
)
condition_datas = task_params["condition"]
conditions = []
for condition_data in condition_datas:
assert "task" in condition_data, f"task must be in {condition_data}"
task_name = condition_data["task"]
condition_string = condition_data.get("condition", None)
# if condition_string is None, for example: {"task": "switch_child_2"}, set it to Default branch
if condition_string is None:
conditions.append(Default(task=name2task.get(task_name)))
# if condition_string is not None, for example:
# {"task": "switch_child_2", "condition": "${var} > 1"} set it to Branch
else:
conditions.append(
Branch(condition_string, task=name2task.get(task_name))
)
switch = Switch(
name=task_params["name"], condition=SwitchCondition(*conditions)
)
return switch
def parse_condition(self, task_params, name2task):
"""Parse Condition Task.
This is an example Yaml fragment of task_params
name: condition
success_task: success_branch
failed_task: fail_branch
OP: AND
groups:
-
OP: AND
groups:
- [pre_task_1, true]
- [pre_task_2, true]
- [pre_task_3, false]
-
OP: AND
groups:
- [pre_task_1, false]
- [pre_task_2, true]
- [pre_task_3, true]
"""
from pydolphinscheduler.tasks.condition import (
FAILURE,
SUCCESS,
And,
Condition,
Or,
)
def get_op_cls(op):
cls = None
if op.lower() == "and":
cls = And
elif op.lower() == "or":
cls = Or
else:
raise Exception(f"OP must be in And or Or, but get: {op}")
return cls
second_cond_ops = []
for first_group in task_params["groups"]:
second_op = first_group["op"]
task_ops = []
for condition_data in first_group["groups"]:
assert "task" in condition_data, f"task must be in {condition_data}"
assert "flag" in condition_data, f"flag must be in {condition_data}"
task_name = condition_data["task"]
flag = condition_data["flag"]
task = name2task[task_name]
# for example: task = pre_task_1, flag = true
if flag:
task_ops.append(SUCCESS(task))
else:
task_ops.append(FAILURE(task))
second_cond_ops.append(get_op_cls(second_op)(*task_ops))
first_op = task_params["op"]
cond_operator = get_op_cls(first_op)(*second_cond_ops)
condition = Condition(
name=task_params["name"],
condition=cond_operator,
success_task=name2task[task_params["success_task"]],
failed_task=name2task[task_params["failed_task"]],
)
return condition
def parse_dependent(self, task_params, name2task):
"""Parse Dependent Task.
This is an example Yaml fragment of task_params
name: dependent
denpendence:
OP: AND
groups:
-
OP: Or
groups:
- [pydolphin, task_dependent_external, task_1]
- [pydolphin, task_dependent_external, task_2]
-
OP: And
groups:
- [pydolphin, task_dependent_external, task_1, LAST_WEDNESDAY]
- [pydolphin, task_dependent_external, task_2, last24Hours]
"""
from pydolphinscheduler.tasks.dependent import (
And,
Dependent,
DependentDate,
DependentItem,
Or,
)
def workflow_dependent_date(dependent_date):
"""Parse dependent date (Compatible with key and value of DependentDate)."""
dependent_date_upper = dependent_date.upper()
if hasattr(DependentDate, dependent_date_upper):
dependent_date = getattr(DependentDate, dependent_date_upper)
return dependent_date
def get_op_cls(op):
cls = None
if op.lower() == "and":
cls = And
elif op.lower() == "or":
cls = Or
else:
raise Exception(f"OP must be in And or Or, but get: {op}")
return cls
def create_dependent_item(source_items):
"""Parse dependent item.
project_name: pydolphin
workflow_name: task_dependent_external
dependent_task_name: task_1
dependent_date: LAST_WEDNESDAY
"""
project_name = source_items["project_name"]
workflow_name = source_items["workflow_name"]
dependent_task_name = source_items["dependent_task_name"]
dependent_date = source_items.get("dependent_date", DependentDate.TODAY)
dependent_item = DependentItem(
project_name=project_name,
workflow_name=workflow_name,
dependent_task_name=dependent_task_name,
dependent_date=workflow_dependent_date(dependent_date),
)
return dependent_item
second_dependences = []
for first_group in task_params["groups"]:
second_op = first_group[KEY_OP]
dependence_items = []
for source_items in first_group["groups"]:
dependence_items.append(create_dependent_item(source_items))
second_dependences.append(get_op_cls(second_op)(*dependence_items))
first_op = task_params[KEY_OP]
dependence = get_op_cls(first_op)(*second_dependences)
task = Dependent(
name=task_params["name"],
dependence=dependence,
)
return task
def create_workflow(yaml_file):
"""CLI."""
YamlWorkflow.parse(yaml_file)