| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """Test Task class function.""" |
| import logging |
| import re |
| import warnings |
| from datetime import timedelta |
| from typing import Set, Tuple |
| from unittest.mock import PropertyMock, patch |
| |
| import pytest |
| |
| from pydolphinscheduler.core.parameter import ParameterType |
| from pydolphinscheduler.core.task import Task, TaskRelation |
| from pydolphinscheduler.core.workflow import Workflow |
| from pydolphinscheduler.exceptions import PyResPluginException |
| from pydolphinscheduler.resources_plugin import Local |
| from tests.testing.task import Task as TestTask |
| from tests.testing.task import TaskWithCode |
| |
| TEST_TASK_RELATION_SET = set() |
| TEST_TASK_RELATION_SIZE = 0 |
| |
| |
| @pytest.mark.parametrize( |
| "addition, ignore, expect", |
| [ |
| ( |
| set(), |
| set(), |
| { |
| "local_params", |
| "resource_list", |
| "dependence", |
| "wait_start_timeout", |
| "condition_result", |
| }, |
| ), |
| ( |
| set(), |
| {"dependence", "condition_result", "not_exists"}, |
| { |
| "local_params", |
| "resource_list", |
| "wait_start_timeout", |
| }, |
| ), |
| ( |
| { |
| "not_exists_1", |
| "not_exists_2", |
| }, |
| set(), |
| { |
| "not_exists_1", |
| "not_exists_2", |
| "local_params", |
| "resource_list", |
| "dependence", |
| "wait_start_timeout", |
| "condition_result", |
| }, |
| ), |
| # test addition and ignore conflict to see behavior |
| ( |
| { |
| "not_exists", |
| }, |
| {"condition_result", "not_exists"}, |
| { |
| "not_exists", |
| "local_params", |
| "resource_list", |
| "dependence", |
| "wait_start_timeout", |
| }, |
| ), |
| ], |
| ) |
| def test__get_attr(addition: Set, ignore: Set, expect: Set): |
| """Test task function `_get_attr`.""" |
| task = TestTask( |
| name="test-get-attr", |
| task_type="test", |
| ) |
| task._task_custom_attr = addition |
| task._task_ignore_attr = ignore |
| assert task._get_attr() == expect |
| |
| |
| @pytest.mark.parametrize( |
| "value, expect", |
| [ |
| (None, (0, "CLOSE")), |
| (timedelta(seconds=0.1), (1, "OPEN")), |
| (timedelta(seconds=61), (2, "OPEN")), |
| (timedelta(seconds=0), (0, "CLOSE")), |
| (timedelta(minutes=1.3), (2, "OPEN")), |
| ], |
| ) |
| def test_task_timeout(value: timedelta, expect: Tuple[int, str]): |
| """Test task timout attribute.""" |
| task = TestTask( |
| name="test-get-attr", |
| task_type="test", |
| timeout=value, |
| ) |
| assert task.timeout == expect[0] |
| assert task.timeout_flag == expect[1] |
| |
| |
| @pytest.mark.parametrize( |
| "attr, expect", |
| [ |
| ( |
| dict(), |
| { |
| "localParams": [], |
| "resourceList": [], |
| "dependence": {}, |
| "waitStartTimeout": {}, |
| "conditionResult": {"successNode": [""], "failedNode": [""]}, |
| }, |
| ), |
| ( |
| { |
| "local_params": ["foo", "bar"], |
| "resource_list": ["foo", "bar"], |
| "dependence": {"foo", "bar"}, |
| "wait_start_timeout": {"foo", "bar"}, |
| "condition_result": {"foo": ["bar"]}, |
| }, |
| { |
| "localParams": ["foo", "bar"], |
| "resourceList": [{"id": 1}], |
| "dependence": {"foo", "bar"}, |
| "waitStartTimeout": {"foo", "bar"}, |
| "conditionResult": {"foo": ["bar"]}, |
| }, |
| ), |
| ], |
| ) |
| @patch( |
| "pydolphinscheduler.core.resource.Resource.get_id_from_database", |
| return_value=1, |
| ) |
| @patch( |
| "pydolphinscheduler.core.task.Task.user_name", |
| return_value="test_user", |
| ) |
| def test_property_task_params(mock_resource, mock_user_name, attr, expect): |
| """Test class task property.""" |
| task = TestTask( |
| "test-property-task-params", |
| "test-task", |
| **attr, |
| ) |
| assert expect == task.task_params |
| |
| |
| @pytest.mark.parametrize( |
| "pre_code, post_code, expect", |
| [ |
| (123, 456, hash("123 -> 456")), |
| (12345678, 987654321, hash("12345678 -> 987654321")), |
| ], |
| ) |
| def test_task_relation_hash_func(pre_code, post_code, expect): |
| """Test TaskRelation magic function :func:`__hash__`.""" |
| task_param = TaskRelation(pre_task_code=pre_code, post_task_code=post_code) |
| assert hash(task_param) == expect |
| |
| |
| @pytest.mark.parametrize( |
| "pre_code, post_code, size_add", |
| [ |
| (123, 456, 1), |
| (123, 456, 0), |
| (456, 456, 1), |
| (123, 123, 1), |
| (456, 123, 1), |
| (0, 456, 1), |
| (123, 0, 1), |
| ], |
| ) |
| def test_task_relation_add_to_set(pre_code, post_code, size_add): |
| """Test TaskRelation with different pre_code and post_code add to set behavior. |
| |
| Here we use global variable to keep set of :class:`TaskRelation` instance and the number we expect |
| of the size when we add a new task relation to exists set. |
| """ |
| task_relation = TaskRelation(pre_task_code=pre_code, post_task_code=post_code) |
| TEST_TASK_RELATION_SET.add(task_relation) |
| # hint python interpreter use global variable instead of local's |
| global TEST_TASK_RELATION_SIZE |
| TEST_TASK_RELATION_SIZE += size_add |
| assert len(TEST_TASK_RELATION_SET) == TEST_TASK_RELATION_SIZE |
| |
| |
| def test_task_relation_to_dict(): |
| """Test TaskRelation object function to_dict.""" |
| pre_task_code = 123 |
| post_task_code = 456 |
| expect = { |
| "name": "", |
| "preTaskCode": pre_task_code, |
| "postTaskCode": post_task_code, |
| "preTaskVersion": 1, |
| "postTaskVersion": 1, |
| "conditionType": 0, |
| "conditionParams": {}, |
| } |
| task_relation = TaskRelation( |
| pre_task_code=pre_task_code, post_task_code=post_task_code |
| ) |
| assert task_relation.get_define() == expect |
| |
| |
| def test_task_get_define(): |
| """Test Task object function get_define.""" |
| code = 123 |
| version = 1 |
| name = "test_task_get_define" |
| task_type = "test_task_get_define_type" |
| expect = { |
| "code": code, |
| "name": name, |
| "version": version, |
| "description": None, |
| "delayTime": 0, |
| "taskType": task_type, |
| "taskParams": { |
| "resourceList": [], |
| "localParams": [], |
| "dependence": {}, |
| "conditionResult": {"successNode": [""], "failedNode": [""]}, |
| "waitStartTimeout": {}, |
| }, |
| "flag": "YES", |
| "isCache": "NO", |
| "taskPriority": "MEDIUM", |
| "workerGroup": "default", |
| "environmentCode": None, |
| "failRetryTimes": 0, |
| "failRetryInterval": 1, |
| "timeoutFlag": "CLOSE", |
| "timeoutNotifyStrategy": None, |
| "timeout": 0, |
| } |
| with patch( |
| "pydolphinscheduler.core.task.Task.gen_code_and_version", |
| return_value=(code, version), |
| ): |
| task = Task(name=name, task_type=task_type) |
| assert task.get_define() == expect |
| |
| |
| @pytest.mark.parametrize("shift", ["<<", ">>"]) |
| def test_two_tasks_shift(shift: str): |
| """Test bit operator between tasks. |
| |
| Here we test both `>>` and `<<` bit operator. |
| """ |
| upstream = TestTask(name="upstream", task_type=shift) |
| downstream = TestTask(name="downstream", task_type=shift) |
| if shift == "<<": |
| downstream << upstream |
| elif shift == ">>": |
| upstream >> downstream |
| else: |
| assert False, f"Unexpect bit operator type {shift}." |
| assert ( |
| 1 == len(upstream._downstream_task_codes) |
| and downstream.code in upstream._downstream_task_codes |
| ), "Task downstream task attributes error, downstream codes size or specific code failed." |
| assert ( |
| 1 == len(downstream._upstream_task_codes) |
| and upstream.code in downstream._upstream_task_codes |
| ), "Task upstream task attributes error, upstream codes size or upstream code failed." |
| |
| |
| @pytest.mark.parametrize( |
| "dep_expr, flag", |
| [ |
| ("task << tasks", "upstream"), |
| ("tasks << task", "downstream"), |
| ("task >> tasks", "downstream"), |
| ("tasks >> task", "upstream"), |
| ], |
| ) |
| def test_tasks_list_shift(dep_expr: str, flag: str): |
| """Test bit operator between task and sequence of tasks. |
| |
| Here we test both `>>` and `<<` bit operator. |
| """ |
| reverse_dict = { |
| "upstream": "downstream", |
| "downstream": "upstream", |
| } |
| task_type = "dep_task_and_tasks" |
| task = TestTask(name="upstream", task_type=task_type) |
| tasks = [ |
| TestTask(name="downstream1", task_type=task_type), |
| TestTask(name="downstream2", task_type=task_type), |
| ] |
| |
| # Use build-in function eval to simply test case and reduce duplicate code |
| eval(dep_expr) |
| direction_attr = f"_{flag}_task_codes" |
| reverse_direction_attr = f"_{reverse_dict[flag]}_task_codes" |
| assert 2 == len(getattr(task, direction_attr)) |
| assert [t.code in getattr(task, direction_attr) for t in tasks] |
| |
| assert all([1 == len(getattr(t, reverse_direction_attr)) for t in tasks]) |
| assert all([task.code in getattr(t, reverse_direction_attr) for t in tasks]) |
| |
| |
| def test_add_duplicate(caplog): |
| """Test add task which code already in workflow.""" |
| with Workflow("test_add_duplicate_workflow") as _: |
| TaskWithCode(name="test_task_1", task_type="test", code=123, version=1) |
| with caplog.at_level(logging.WARNING): |
| TaskWithCode( |
| name="test_task_duplicate_code", task_type="test", code=123, version=2 |
| ) |
| assert all( |
| [ |
| caplog.text.startswith("WARNING pydolphinscheduler"), |
| re.findall("already in workflow", caplog.text), |
| ] |
| ) |
| |
| |
| def test_use_deprecated_pd(caplog): |
| """Test raise warning when using process_defintion assign workflow.""" |
| wf = Workflow("test_deprecated_pd") |
| with warnings.catch_warnings(record=True) as w: |
| task = TaskWithCode( |
| name="test_deprecated_pd", |
| task_type="test", |
| code=123, |
| version=2, |
| process_definition=wf, |
| ) |
| assert len(w) == 1 |
| assert issubclass(w[-1].category, DeprecationWarning) |
| assert "deprecated" in str(w[-1].message) |
| assert task.workflow == wf |
| |
| |
| @pytest.mark.parametrize( |
| "val, expected", |
| [ |
| ("a.sh", "echo Test task attribute ext_attr"), |
| ("a.zsh", "echo Test task attribute ext_attr"), |
| ("echo Test task attribute ext_attr", "echo Test task attribute ext_attr"), |
| ], |
| ) |
| @patch( |
| "pydolphinscheduler.core.task.Task.gen_code_and_version", |
| return_value=(123, 1), |
| ) |
| @patch( |
| "pydolphinscheduler.core.task.Task.ext", |
| new_callable=PropertyMock, |
| return_value={".sh", ".zsh"}, |
| ) |
| @patch( |
| "pydolphinscheduler.core.task.Task.ext_attr", |
| new_callable=PropertyMock, |
| return_value="_raw_script", |
| ) |
| @patch( |
| "pydolphinscheduler.core.task.Task._raw_script", |
| create=True, |
| new_callable=PropertyMock, |
| ) |
| @patch("pydolphinscheduler.core.task.Task.get_plugin") |
| def test_task_ext_attr( |
| m_plugin, m_raw_script, m_ext_attr, m_ext, m_code_version, val, expected |
| ): |
| """Test task attribute ext_attr.""" |
| m_plugin.return_value.read_file.return_value = expected |
| m_raw_script.return_value = val |
| task = Task("test_task_ext_attr", "test_task_ext_attr") |
| assert expected == getattr(task, "raw_script") |
| |
| |
| @pytest.mark.parametrize( |
| "attr, expected", |
| [ |
| ( |
| { |
| "name": "test_task_abtain_res_plugin", |
| "task_type": "TaskType", |
| "resource_plugin": Local("prefix"), |
| "workflow": Workflow( |
| name="workflow", |
| resource_plugin=Local("prefix"), |
| ), |
| }, |
| "Local", |
| ), |
| ( |
| { |
| "name": "test_task_abtain_res_plugin", |
| "task_type": "TaskType", |
| "resource_plugin": Local("prefix"), |
| }, |
| "Local", |
| ), |
| ( |
| { |
| "name": "test_task_abtain_res_plugin", |
| "task_type": "TaskType", |
| "workflow": Workflow( |
| name="workflow", |
| resource_plugin=Local("prefix"), |
| ), |
| }, |
| "Local", |
| ), |
| ], |
| ) |
| @patch( |
| "pydolphinscheduler.core.task.Task.gen_code_and_version", |
| return_value=(123, 1), |
| ) |
| @patch("pydolphinscheduler.core.task.Task.get_content") |
| def test_task_obtain_res_plugin(m_get_content, m_code_version, attr, expected): |
| """Test task obtaining resource plug-in.""" |
| task = Task(**attr) |
| assert expected == task.get_plugin().__class__.__name__ |
| |
| |
| @pytest.mark.parametrize( |
| "attr", |
| [ |
| { |
| "name": "test_task_abtain_res_plugin", |
| "task_type": "TaskType", |
| "workflow": Workflow( |
| name="workflow", |
| ), |
| }, |
| ], |
| ) |
| @patch( |
| "pydolphinscheduler.core.task.Task.gen_code_and_version", |
| return_value=(123, 1), |
| ) |
| @patch("pydolphinscheduler.core.task.Task.get_content") |
| def test_task_obtain_res_plugin_exception(m_get_content, m_code_version, attr): |
| """Test task obtaining resource plug-in exception.""" |
| with pytest.raises( |
| PyResPluginException, |
| match="The execution command of this task is a file, but the resource plugin is empty", |
| ): |
| task = Task(**attr) |
| task.get_plugin() |
| |
| |
| @pytest.mark.parametrize( |
| "resources, expect", |
| [ |
| ( |
| ["/dev/test.py"], |
| [{"id": 1}], |
| ), |
| ( |
| ["/dev/test.py", {"id": 2}], |
| [{"id": 1}, {"id": 2}], |
| ), |
| ], |
| ) |
| @patch( |
| "pydolphinscheduler.core.task.Task.gen_code_and_version", |
| return_value=(123, 1), |
| ) |
| @patch( |
| "pydolphinscheduler.core.resource.Resource.get_id_from_database", |
| return_value=1, |
| ) |
| @patch( |
| "pydolphinscheduler.core.task.Task.user_name", |
| return_value="test_user", |
| ) |
| def test_python_resource_list( |
| mock_code_version, mock_resource, mock_user_name, resources, expect |
| ): |
| """Test python task resource list.""" |
| task = Task( |
| name="python_resource_list.", |
| task_type="PYTHON", |
| resource_list=resources, |
| ) |
| assert task.resource_list == expect |
| |
| |
| @patch( |
| "pydolphinscheduler.core.task.Task.gen_code_and_version", |
| return_value=(123, 1), |
| ) |
| def test_local_parameter(m_code_version): |
| """Test task local_params.""" |
| base_local_params = [ |
| {"prop": "base", "direct": "IN", "type": "VARCHAR", "value": "2022"}, |
| ] |
| |
| task = Task(name="test", task_type="task_type", local_params=base_local_params) |
| |
| assert task.local_params == base_local_params |
| |
| task = Task( |
| name="test", |
| task_type="task_type", |
| input_params={"a": 123, "b": True}, |
| output_params={"c": "ccc", "d": ParameterType.LONG(123)}, |
| ) |
| |
| expect = [ |
| {"prop": "a", "direct": "IN", "type": "INTEGER", "value": 123}, |
| {"prop": "b", "direct": "IN", "type": "BOOLEAN", "value": True}, |
| {"prop": "c", "direct": "OUT", "type": "VARCHAR", "value": "ccc"}, |
| {"prop": "d", "direct": "OUT", "type": "LONG", "value": "123"}, |
| ] |
| |
| assert task.local_params == expect |
| |
| task = Task( |
| name="test", |
| task_type="task_type", |
| local_params=base_local_params, |
| input_params={"a": 123, "b": True}, |
| output_params={"c": "ccc", "d": ParameterType.LONG(123)}, |
| ) |
| |
| expect = [ |
| {"prop": "base", "direct": "IN", "type": "VARCHAR", "value": "2022"}, |
| {"prop": "a", "direct": "IN", "type": "INTEGER", "value": 123}, |
| {"prop": "b", "direct": "IN", "type": "BOOLEAN", "value": True}, |
| {"prop": "c", "direct": "OUT", "type": "VARCHAR", "value": "ccc"}, |
| {"prop": "d", "direct": "OUT", "type": "LONG", "value": "123"}, |
| ] |
| |
| assert task.local_params == expect |
| |
| task.add_in("e", "${e}") |
| task.add_out("f") |
| |
| new_params = [ |
| {"prop": "e", "direct": "IN", "type": "VARCHAR", "value": "${e}"}, |
| {"prop": "f", "direct": "OUT", "type": "VARCHAR", "value": ""}, |
| ] |
| expect.extend(new_params) |
| |
| def sorted_func(x): |
| return (x["prop"], x["direct"]) |
| |
| assert sorted(task.local_params, key=sorted_func) == sorted(expect, key=sorted_func) |