| .. Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| .. http://www.apache.org/licenses/LICENSE-2.0 |
| |
| .. Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| |
| Concepts |
| ======== |
| |
| In this section, you would know the core concepts of *PyDolphinScheduler*. |
| |
| Workflow |
| -------- |
| |
| Workflow describes the whole things except `tasks`_ and `tasks dependence`_, which includes |
| name, schedule interval, schedule start time and end time. You would know scheduler |
| |
| Workflow could be initialized in a normal assignment statement or within a context manger. |
| |
| .. code-block:: python |
| |
| # Initialization with assign statement |
| workflow = Workflow(name="my first workflow") |
| |
| # Or context manger |
| with Workflow(name="my first workflow") as workflow: |
| workflow.submit() |
| |
| Workflow is the main object communicating between *PyDolphinScheduler* and DolphinScheduler daemon. |
| After workflow and task is declared, you could use `submit` and `run` to notify server your definition. |
| |
| If you just want to submit your definition and create workflow, without running it, you should use attribute `submit`. |
| But if you want to run the workflow after you submit it, you could use attribute `run`. |
| |
| .. code-block:: python |
| |
| # Just submit definition, without run it |
| workflow.submit() |
| |
| # Both submit and run definition |
| workflow.run() |
| |
| Schedule |
| ~~~~~~~~ |
| |
| We use parameter `schedule` to determine the schedule interval of workflow, *PyDolphinScheduler* supports seven |
| asterisks expression, and each of the meaning of position is as below |
| |
| .. code-block:: text |
| |
| * * * * * * * |
| ┬ ┬ ┬ ┬ ┬ ┬ ┬ |
| │ │ │ │ │ │ │ |
| │ │ │ │ │ │ └─── year |
| │ │ │ │ │ └───── day of week (1 - 7) (1 to 7 are Sunday to Saturday, or use names; 7 is for Sunday, or use `SUN`) |
| │ │ │ │ └─────── month (1 - 12) |
| │ │ │ └───────── day of month (1 - 31) |
| │ │ └─────────── hour (0 - 23) |
| │ └───────────── min (0 - 59) |
| └─────────────── second (0 - 59) |
| |
| Here we add some example crontab: |
| |
| - `0 0 0 * * ? *`: Workflow execute every day at 00:00:00. |
| - `10 2 * * * ? *`: Workflow execute hourly day at ten pass two. |
| - `10,11 20 0 1,2 * ? *`: Workflow execute first and second day of month at 00:20:10 and 00:20:11. |
| |
| Tenant |
| ~~~~~~ |
| |
| Tenant is the user who run task command in machine or in virtual machine. it could be assign by simple string. |
| You should change the tenant value to exists tenant in your host, it config in `config.yaml` in your pydolphinscheduler |
| ``PYDS_HOME``, or via :doc:`CLI <cli>` |
| |
| .. code-block:: bash |
| |
| pydolphinscheduler config --set default.user.tenant <YOUR-TENANT-NAME> |
| |
| .. note:: |
| |
| Make should tenant exists in target machine, otherwise it will raise an error when you try to run command |
| |
| Execution Type |
| ~~~~~~~~~~~~~~ |
| |
| Decision which behavior to run when workflow have multiple instances. when workflow |
| schedule interval is too short, it may cause multiple instances run at the same time. We can use this |
| parameter to control the behavior about how to run those workflow instances. Currently we |
| have four execution type: |
| |
| * ``parallel`` (default value): it means all instances will allow to run even though the previous |
| instance is not finished. |
| * ``serial_wait``: it means the all instance will wait for the previous instance to finish, and |
| all the waiting instances will be executed base on scheduling order. |
| * ``serial_discard``: it means the all instance will be discard(abandon) if the previous instance |
| is not finished. |
| * ``serial_priority``: it means the all instance will wait for the previous instance to finish, |
| and all the waiting instances will be executed base on workflow priority order. |
| |
| Parameter ``execution type`` can be set in |
| |
| * Direct assign statement. You can pick execute type from above and direct assign to parameter |
| ``execution_type``. |
| |
| .. code-block:: python |
| |
| workflow = Workflow( |
| name="workflow_name", |
| execution_type="parallel" |
| ) |
| |
| * Via environment variables, configurations file setting, for more detail about those way setting, you can see |
| you can read :doc:`config` section. |
| |
| Alert |
| ~~~~~ |
| |
| Alert is the way to notify user when workflow instance is success or failed. We can set alert with parameter |
| ``warning_type`` and ``warning_group_id`` in workflow definition. |
| |
| * ``warning_type`` represent the type of alert, when workflow instance in those status, it will trigger alert. |
| The value of ``warning_type`` could be one of ``failure``, ``success``, ``all``, ``none``. |
| * ``warning_group_id`` represent the group of alert, you can get the group id from DolphinScheduler web UI. |
| |
| Tasks |
| ----- |
| |
| Task is the minimum unit running actual job, and it is a node of DAG, aka directed acyclic graph. You could define |
| what you want in the task. It has some required parameters to make uniqueness and definition. |
| |
| Here we use :py:meth:`pydolphinscheduler.tasks.Shell` as example, parameter `name` and `command` is required and must be provider. Parameter |
| `name` set name to the task, and parameter `command` declare the command you wish to run in this task. |
| |
| .. code-block:: python |
| |
| # We named this task as "shell", and just run command `echo shell task` |
| shell_task = Shell(name="shell", command="echo shell task") |
| |
| If you want to see all types of tasks, you could see :doc:`tasks/index`. |
| |
| Tasks Dependence |
| ~~~~~~~~~~~~~~~~ |
| |
| You could define many tasks in on single `Workflow`_. If all those tasks are in parallel processing, |
| then you could leave them alone without adding any additional information. But if there are some tasks that should |
| not be run unless pre task in workflow has been done, we should set task dependence to them. Set task dependence |
| have two main ways and both of them are easy. You could use bitwise operator `>>` and `<<`, or task attribute |
| `set_downstream` and `set_upstream` to do it. |
| |
| .. code-block:: python |
| |
| # Set task1 as task2 upstream |
| task1 >> task2 |
| # You could use attribute `set_downstream` too, is same as `task1 >> task2` |
| task1.set_downstream(task2) |
| |
| # Set task1 as task2 downstream |
| task1 << task2 |
| # It is same as attribute `set_upstream` |
| task1.set_upstream(task2) |
| |
| # Beside, we could set dependence between task and sequence of tasks, |
| # we set `task1` is upstream to both `task2` and `task3`. It is useful |
| # for some tasks have same dependence. |
| task1 >> [task2, task3] |
| |
| Task With Workflow |
| ~~~~~~~~~~~~~~~~~~ |
| |
| In most of data orchestration cases, you should assign attribute `workflow` to task instance to |
| decide workflow of task. You could set `workflow` in both normal assign or in context manger mode |
| |
| .. code-block:: python |
| |
| # Normal assign, have to explicit declaration and pass `Workflow` instance to task |
| workflow = Workflow(name="my first workflow") |
| shell_task = Shell(name="shell", command="echo shell task", workflow=workflow) |
| |
| # Context manger, `Workflow` instance workflow would implicit declaration to task |
| with Workflow(name="my first workflow") as workflow: |
| shell_task = Shell(name="shell", command="echo shell task", |
| |
| With both `Workflow`_, `Tasks`_ and `Tasks Dependence`_, we could build a workflow with multiple tasks. |
| |
| Task Group |
| ~~~~~~~~~~ |
| |
| A task group can manage and control the maximum number of concurrently running tasks. This is particularly |
| useful when you want to limit the simultaneous execution of various task types. For instance, in an ETL |
| (Extract, Transform, Load) job where data is extracted from a source database, it's crucial to control the |
| parallelism of extract tasks to prevent an excessive number of connections to the source database. This is |
| where a task group comes into play. There are two key parameters, ``task_group_id`` and ``task_group_priority`` |
| that determine the behavior of the task group. |
| |
| Task group can control the maximum number of tasks running at the same time. It is useful when you don't want |
| to run too many type of tasks at the same time. For example when you extract data from source database in ELT |
| job, you want to control the parallelism of extract task to avoid too many connections to source database. |
| Then task group can help you. There are two major parameters ``task_group_id`` and ``task_group_priority`` |
| to control the behavior of task group. |
| |
| * ``task_group_id``: is an integer used to identify the task group. You can set a ``task_group_id`` to |
| restrict the parallelism of tasks. The ``task_group_id`` can be find in the DolphinScheduler web UI. The |
| default value is ``0``, which means there are no restrictions for this task group. |
| * ``task_group_priority``: is an integer used to define the priority of the task group. When different tasks |
| share the same ``task_group_id``, the task group's priority comes into play, controlling the order in which |
| they run. Higher values indicate higher priority. The default value is ``0``, which means there's no |
| specific priority for this task group, and tasks will run in the order they were created. |
| |
| Here's an example in Python: |
| |
| .. code-block:: python |
| |
| extract = Shell( |
| name="extract", |
| command="echo 'Some extract command here'", |
| task_group_id=1, |
| task_group_priority=123 |
| ) |
| |
| Resource Files |
| -------------- |
| |
| During workflow running, we may need some resource files to help us run task usually. One of a common situation |
| is that we already have some executable files locally, and we need to schedule a specific time, or add them |
| to existing workflow by adding the new tasks. Of course, we can upload those files to target machine and run them |
| in :doc:`shell task <tasks/shell>` by reference the absolute path of file. But if we have more than one machine |
| to run task, we have to upload those files to each of them. And it is not convenient and not flexible, because |
| we may need to change our resource files sometimes. |
| |
| One more pydolphinscheduler way is to upload those files together with `workflow`_, and use them in task to run. |
| For example, you have a bash script named ``echo-ten.sh`` locally, and it contains some code like this: |
| |
| .. code-block:: bash |
| |
| #!/bin/env bash |
| max=10 |
| for ((i=1; i <= $max; ++i)); do |
| echo "$i" |
| done |
| |
| and you want to use it in workflow but do not want to copy-paste it to shell task parameter ``command``. You could |
| use this mechanism to upload it to resource center when you create workflow |
| |
| .. code-block:: python |
| |
| # Read file content |
| file_name = "echo-ten.sh" |
| |
| with open(file_name, "r") as f: |
| content = f.read() |
| |
| with Workflow( |
| name="upload_and_run", |
| resource_list=[ |
| Resource(name=file_name, content=content), |
| ], |
| ) as workflow: |
| |
| And when we call :code:`workflow.run()` the new file named ``echo-ten.sh`` would be uploaded to dolphinscheduler |
| resource center. |
| |
| After that we can use this file in our task by reference it by name, in this case we could use :doc:`shell task <tasks/shell>` |
| to run it. |
| |
| .. code-block:: python |
| |
| # We use `shell` task to run `echo-ten.sh` file |
| shell_task = Shell( |
| name="run", |
| command=f"bash {file_name}", |
| resource_list=[ |
| file_name |
| ], |
| ) |
| |
| During workflow running, the file would be downloaded to the task runtime working directory which mean you could |
| execute them. We execute file by ``bash`` but reference it by name directly. |
| |
| Please notice that we could also reference the resource file already in dolphinscheduler resource center, which |
| mean we could use resource center files in task by name without upload it again. And we can upload files to |
| resource center bare without workflow. |
| |
| .. code-block:: python |
| |
| # Upload file to resource center |
| from pydolphinscheduler.core.resource import Resource |
| |
| resource = Resource(name="bare-create.py", user_name="<USER-MUST-EXISTS-WITH-TENANT>", content="print('Bareh create resource')") |
| resource.create_or_update_resource() |
| |
| After that, we could see new file named ``bare-create.py`` is be created in resource center. |
| |
| .. note:: |
| |
| Both parameter ``resource_list`` in workflow and task is list of string which mean you could upload and reference |
| multiple files. For more complex usage, you could read :doc:`howto/multi-resources`. |
| |
| Local Parameters |
| ---------------- |
| |
| In DolphinScheduler, we can define parameter in task, aka Local Parameter. |
| |
| We can set parameters to variables in tasks to better manage our tasks. |
| |
| For example: |
| |
| |
| .. literalinclude:: ../../src/pydolphinscheduler/examples/local_parameter_example.py |
| :start-after: [start parameter example] |
| :end-before: [end parameter example] |
| :language: python |
| |
| |
| There are two ways to define local parameters: |
| |
| .. literalinclude:: ../../src/pydolphinscheduler/examples/local_parameter_example.py |
| :start-after: [start parameter define] |
| :end-before: [end parameter define] |
| :language: python |
| |
| |
| Full example: |
| |
| .. literalinclude:: ../../src/pydolphinscheduler/examples/local_parameter_example.py |
| :start-after: [start workflow_declare] |
| :end-before: [end workflow_declare] |
| :language: python |
| |
| |
| Authentication Token |
| -------------------- |
| |
| pydolphinscheduler use token as authentication when communication with dolphinscheduler server, and we have a default auth |
| token to make it out-of-box. For security reason, we highly recommend you to change your own auth token when you |
| deploy in production environment or test dolphinscheduler in public network. The auth token keyword in ``auth_token`` |
| and it can be set in multiple ways which you can read :doc:`config` section for more detail. |