[feat] Add execute type to workflow (#9)
Up to now, we can only submit workflow with parallel
mode. this patch give users ability specific execute
type to workflow
diff --git a/docs/source/concept.rst b/docs/source/concept.rst
index 9a9527d..de49c9c 100644
--- a/docs/source/concept.rst
+++ b/docs/source/concept.rst
@@ -90,6 +90,38 @@
Make should tenant exists in target machine, otherwise it will raise an error when you try to run command
+Execution Type
+~~~~~~~~~~~~~~
+
+Decision which behavior to run when process definition have multiple instances. when process definition
+schedule interval is too short, it may cause multiple instances run at the same time. We can use this
+parameter to control the behavior about how to run those process definition instances. Currently we
+have four execution type:
+
+* ``parallel`` (default value): it means all instances will allow to run even though the previous
+ instance is not finished.
+* ``serial_wait``: it means the all instance will wait for the previous instance to finish, and
+ all the waiting instances will be executed base on scheduling order.
+* ``serial_discard``: it means the all instance will be discard(abandon) if the previous instance
+ is not finished.
+* ``serial_priority``: it means the all instance will wait for the previous instance to finish,
+ and all the waiting instances will be executed base on process definition priority order.
+
+Parameter ``execution type`` can be set in
+
+* Direct assign statement. You can pick execute type from above and direct assign to parameter
+ ``execution_type``.
+
+ .. code-block:: python
+
+ pd = ProcessDefinition(
+ name="process-definition",
+ execution_type="parallel"
+ )
+
+* Via environment variables, configurations file setting, for more detail about those way setting, you can see
+ you can read :doc:`config` section.
+
Tasks
-----
diff --git a/docs/source/config.rst b/docs/source/config.rst
index 29a143d..3f7fff8 100644
--- a/docs/source/config.rst
+++ b/docs/source/config.rst
@@ -78,41 +78,43 @@
All environment variables as below, and you could modify their value via `Bash <by bash>`_ or `Python OS Module <by python os module>`_
-+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| Variable Section | Variable Name | description |
-+==================+====================================+====================================================================================================================+
-| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. |
-+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. |
-+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_RELEASE_STATE`` | Default workflow release state, will use its value when workflow does not specify the attribute ``release_state``. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. |
-+ +------------------------------------+--------------------------------------------------------------------------------------------------------------------+
-| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. |
-+------------------+------------------------------------+--------------------------------------------------------------------------------------------------------------------+
++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| Variable Section | Variable Name | description |
++==================+====================================+=====================================================================================================================+
+| | ``PYDS_JAVA_GATEWAY_ADDRESS`` | Default Java gateway address, will use its value when it is set. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| Java Gateway | ``PYDS_JAVA_GATEWAY_PORT`` | Default Java gateway port, will use its value when it is set. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_JAVA_GATEWAY_AUTO_CONVERT`` | Default boolean Java gateway auto convert, will use its value when it is set. |
++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_USER_NAME`` | Default user name, will use when user's ``name`` when does not specify. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_USER_PASSWORD`` | Default user password, will use when user's ``password`` when does not specify. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| Default User | ``PYDS_USER_EMAIL`` | Default user email, will use when user's ``email`` when does not specify. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_USER_PHONE`` | Default user phone, will use when user's ``phone`` when does not specify. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_USER_STATE`` | Default user state, will use when user's ``state`` when does not specify. |
++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_PROJECT`` | Default workflow project name, will use its value when workflow does not specify the attribute ``project``. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_TENANT`` | Default workflow tenant, will use its value when workflow does not specify the attribute ``tenant``. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| Default Workflow | ``PYDS_WORKFLOW_USER`` | Default workflow user, will use its value when workflow does not specify the attribute ``user``. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_QUEUE`` | Default workflow queue, will use its value when workflow does not specify the attribute ``queue``. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_WORKER_GROUP`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``worker_group``. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_RELEASE_STATE`` | Default workflow release state, will use its value when workflow does not specify the attribute ``release_state``. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_TIME_ZONE`` | Default workflow worker group, will use its value when workflow does not specify the attribute ``timezone``. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_WARNING_TYPE`` | Default workflow warning type, will use its value when workflow does not specify the attribute ``warning_type``. |
++ +------------------------------------+---------------------------------------------------------------------------------------------------------------------+
+| | ``PYDS_WORKFLOW_EXECUTION_TYPE`` | Default workflow execution type, will use its value when workflow does not specify the attribute ``execution_type``.|
++------------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
.. note::
diff --git a/src/pydolphinscheduler/configuration.py b/src/pydolphinscheduler/configuration.py
index 860f986..2f0c2c0 100644
--- a/src/pydolphinscheduler/configuration.py
+++ b/src/pydolphinscheduler/configuration.py
@@ -189,5 +189,8 @@
WORKFLOW_WARNING_TYPE = os.environ.get(
"PYDS_WORKFLOW_WARNING_TYPE", configs.get("default.workflow.warning_type")
)
+WORKFLOW_EXECUTION_TYPE = os.environ.get(
+ "PYDS_WORKFLOW_EXECUTION_TYPE", configs.get("default.workflow.execution_type")
+)
# End Common Configuration Setting
diff --git a/src/pydolphinscheduler/core/process_definition.py b/src/pydolphinscheduler/core/process_definition.py
index 62de7ed..3487435 100644
--- a/src/pydolphinscheduler/core/process_definition.py
+++ b/src/pydolphinscheduler/core/process_definition.py
@@ -57,6 +57,20 @@
TODO: maybe we should rename this class, currently use DS object name.
+ :param execution_type: Decision which behavior to run when process definition have multiple instances.
+ when process definition schedule interval is too short, it may cause multiple instances run at the
+ same time. We can use this parameter to control the behavior about how to run those process definition
+ instances. Currently we have four execution type:
+
+ * ``PARALLEL``: Default value, all instances will allow to run even though the previous
+ instance is not finished.
+ * ``SERIAL_WAIT``: All instance will wait for the previous instance to finish, and all
+ the waiting instances will be executed base on scheduling order.
+ * ``SERIAL_DISCARD``: All instances will be discard(abandon) if the previous instance is not
+ finished.
+ * ``SERIAL_PRIORITY``: means the all instance will wait for the previous instance to finish, and
+ all the waiting instances will be executed base on process definition priority order.
+
:param user: The user for current process definition. Will create a new one if it do not exists. If your
parameter ``project`` already exists but project's create do not belongs to ``user``, will grant
``project`` to ``user`` automatically.
@@ -86,6 +100,7 @@
"worker_group",
"warning_type",
"warning_group_id",
+ "execution_type",
"timeout",
"release_state",
"param",
@@ -109,6 +124,7 @@
worker_group: Optional[str] = configuration.WORKFLOW_WORKER_GROUP,
warning_type: Optional[str] = configuration.WORKFLOW_WARNING_TYPE,
warning_group_id: Optional[int] = 0,
+ execution_type: Optional[str] = configuration.WORKFLOW_EXECUTION_TYPE,
timeout: Optional[int] = 0,
release_state: Optional[str] = configuration.WORKFLOW_RELEASE_STATE,
param: Optional[Dict] = None,
@@ -132,6 +148,17 @@
else:
self.warning_type = warning_type.strip().upper()
self.warning_group_id = warning_group_id
+ if execution_type is None or execution_type.strip().upper() not in (
+ "PARALLEL",
+ "SERIAL_WAIT",
+ "SERIAL_DISCARD",
+ "SERIAL_PRIORITY",
+ ):
+ raise PyDSParamException(
+ "Parameter `execution_type` with unexpect value `%s`", execution_type
+ )
+ else:
+ self._execution_type = execution_type
self.timeout = timeout
self._release_state = release_state
self.param = param
@@ -226,6 +253,16 @@
self._release_state = val.lower()
@property
+ def execution_type(self) -> str:
+ """Get attribute execution_type."""
+ return self._execution_type.upper()
+
+ @execution_type.setter
+ def execution_type(self, val: str) -> None:
+ """Set attribute execution_type."""
+ self._execution_type = val
+
+ @property
def param_json(self) -> Optional[List[Dict]]:
"""Return param json base on self.param."""
# Handle empty dict and None value
@@ -390,6 +427,7 @@
json.dumps(self.param_json),
self.warning_type,
self.warning_group_id,
+ self.execution_type,
self.timeout,
self.worker_group,
self._tenant,
@@ -399,7 +437,6 @@
json.dumps(self.task_definition_json),
json.dumps(self.schedule_json) if self.schedule_json else None,
None,
- None,
)
if len(self.resource_list) > 0:
for res in self.resource_list:
diff --git a/src/pydolphinscheduler/default_config.yaml b/src/pydolphinscheduler/default_config.yaml
index 98d7b99..5ad3064 100644
--- a/src/pydolphinscheduler/default_config.yaml
+++ b/src/pydolphinscheduler/default_config.yaml
@@ -56,3 +56,6 @@
# change to ``FAILURE`` if you want to warn users when workflow failed. All available enum value are
# ``NONE``, ``SUCCESS``, ``FAILURE``, ``ALL``
warning_type: NONE
+ # Default execution type about how to run multiple workflow instances, default value is ``parallel`` which
+ # mean run all workflow instances parallel and the other value is ``SERIAL_WAIT``, ``SERIAL_DISCARD``, ``SERIAL_PRIORITY``
+ execution_type: parallel
diff --git a/src/pydolphinscheduler/java_gateway.py b/src/pydolphinscheduler/java_gateway.py
index 54bb0a3..cd03d32 100644
--- a/src/pydolphinscheduler/java_gateway.py
+++ b/src/pydolphinscheduler/java_gateway.py
@@ -254,6 +254,7 @@
global_params: str,
warning_type: str,
warning_group_id: int,
+ execution_type: str,
timeout: int,
worker_group: str,
tenant_code: str,
@@ -262,7 +263,6 @@
task_definition_json: str,
schedule: Optional[str] = None,
other_params_json: Optional[str] = None,
- execution_type: Optional[str] = None,
):
"""Create or update process definition through java gateway."""
return self.java_gateway.entry_point.createOrUpdateProcessDefinition(
diff --git a/tests/core/test_process_definition.py b/tests/core/test_process_definition.py
index 30445bf..c8fffc2 100644
--- a/tests/core/test_process_definition.py
+++ b/tests/core/test_process_definition.py
@@ -67,6 +67,7 @@
("worker_group", configuration.WORKFLOW_WORKER_GROUP),
("warning_type", configuration.WORKFLOW_WARNING_TYPE),
("warning_group_id", 0),
+ ("execution_type", configuration.WORKFLOW_EXECUTION_TYPE.upper()),
("release_state", 1),
],
)
@@ -89,6 +90,7 @@
("worker_group", str, "worker_group"),
("warning_type", str, "FAILURE"),
("warning_group_id", int, 1),
+ ("execution_type", str, "PARALLEL"),
("timeout", int, 1),
("param", dict, {"key": "value"}),
(
@@ -211,6 +213,22 @@
@pytest.mark.parametrize(
+ "val",
+ [
+ "ALLL",
+ "",
+ None,
+ ],
+)
+def test_execute_type_not_support_type(val: str):
+ """Test process definition param execute_type not support type error."""
+ with pytest.raises(
+ PyDSParamException, match="Parameter `execution_type` with unexpect value.*?"
+ ):
+ ProcessDefinition(TEST_PROCESS_DEFINITION_NAME, execution_type=val)
+
+
+@pytest.mark.parametrize(
"param, expect",
[
(
@@ -321,6 +339,7 @@
"workerGroup": configuration.WORKFLOW_WORKER_GROUP,
"warningType": configuration.WORKFLOW_WARNING_TYPE,
"warningGroupId": 0,
+ "executionType": "PARALLEL",
"timeout": 0,
"releaseState": 1,
"param": None,
diff --git a/tests/utils/test_yaml_parser.py b/tests/utils/test_yaml_parser.py
index ad3aaf7..3abdda6 100644
--- a/tests/utils/test_yaml_parser.py
+++ b/tests/utils/test_yaml_parser.py
@@ -63,6 +63,7 @@
"default.workflow.release_state": ("online", "offline"),
"default.workflow.time_zone": ("Asia/Shanghai", "Europe/Amsterdam"),
"default.workflow.warning_type": ("NONE", "SUCCESS"),
+ "default.workflow.execution_type": ("parallel", "serial_wait"),
},
]