blob: 6c51f1694585c009d10172287c5cfdd41c29e8d1 [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Tutorial on the Taskflow API
============================
This tutorial builds on the regular Airflow Tutorial and focuses specifically
on writing data pipelines using the Taskflow API paradigm which is introduced as
part of Airflow 2.0 and contrasts this with DAGs written using the traditional paradigm.
The data pipeline chosen here is a simple ETL pattern with
three separate tasks for Extract, Transform, and Load.
Example "Taskflow API" ETL Pipeline
-----------------------------------
Here is very simple ETL pipeline using the Taskflow API paradigm. A more detailed
explanation is given below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]
It's a DAG definition file
--------------------------
If this is the first DAG file you are looking at, please note that this Python script
is interpreted by Airflow and is a configuration file for your data pipeline.
For a complete introduction to DAG files, please look at the core :doc:`Airflow tutorial<tutorial>`
which covers DAG structure and definitions extensively.
Instantiate a DAG
-----------------
We are creating a DAG which is the collection of our tasks with dependencies between
the tasks. This is a very simple definition, since we just want the DAG to be run
when we set this up with Airflow, without any retries or complex scheduling.
In this example, please notice that we are creating this DAG using the ``@dag`` decorator
as shown below, with the python function name acting as the DAG identifier.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
:language: python
:start-after: [START instantiate_dag]
:end-before: [END instantiate_dag]
Tasks
-----
In this data pipeline, tasks are created based on Python functions using the ``@task`` decorator
as shown below. The function name acts as a unique identifier for the task.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
:language: python
:start-after: [START extract]
:end-before: [END extract]
The returned value, which in this case is a dictionary, will be made available for use in later tasks.
The Transform and Load tasks are created in the same manner as the Extract task shown above.
Main flow of the DAG
--------------------
Now that we have the Extract, Transform, and Load tasks defined based on the Python functions,
we can move to the main part of the DAG.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
:language: python
:start-after: [START main_flow]
:end-before: [END main_flow]
That's it, we are done!
We have invoked the Extract task, obtained the order data from there and sent it over to
the Transform task for summarization, and then invoked the Load task with the summarized data.
The dependencies between the tasks and the passing of data between these tasks which could be
running on different workers on different nodes on the network is all handled by Airflow.
Now to actually enable this to be run as a DAG, we invoke the python function
``tutorial_taskflow_api_etl`` set up using the ``@dag`` decorator earlier, as shown below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
:language: python
:start-after: [START dag_invocation]
:end-before: [END dag_invocation]
But how?
--------
For experienced Airflow DAG authors, this is startlingly simple! Let's contrast this with
how this DAG had to be written before Airflow 2.0 below:
.. exampleinclude:: /../../airflow/example_dags/tutorial_etl_dag.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]
All of the processing shown above is being done in the new Airflow 2.0 dag as well, but
it is all abstracted from the DAG developer.
Let's examine this in detail by looking at the Transform task in isolation since it is
in the middle of the data pipeline. In Airflow 1.x, this task is defined as shown below:
.. exampleinclude:: /../../airflow/example_dags/tutorial_etl_dag.py
:language: python
:start-after: [START transform_function]
:end-before: [END transform_function]
As we see here, the data being processed in the Transform function is passed to it using Xcom
variables. In turn, the summarized data from the Transform function is also placed
into another Xcom variable which will then be used by the Load task.
Contrasting that with Taskflow API in Airflow 2.0 as shown below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
:language: python
:start-after: [START transform]
:end-before: [END transform]
All of the Xcom usage for data passing between these tasks is abstracted away from the DAG author
in Airflow 2.0. However, Xcom variables are used behind the scenes and can be viewed using
the Airflow UI as necessary for debugging or DAG monitoring.
Similarly, task dependencies are automatically generated within TaskFlows based on the
functional invocation of tasks. In Airflow 1.x, tasks had to be explicitly created and
dependencies specified as shown below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_etl_dag.py
:language: python
:start-after: [START main_flow]
:end-before: [END main_flow]
In contrast, with the Taskflow API in Airflow 2.0, the invocation itself automatically generates
the dependencies as shown below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api_etl.py
:language: python
:start-after: [START main_flow]
:end-before: [END main_flow]
Multiple outputs inference
--------------------------
Tasks can also infer multiple outputs by using dict python typing.
.. code-block:: python
@task
def identity_dict(x: int, y: int) -> Dict[str, int]:
return {"x": x, "y": y}
By using the typing ``Dict`` for the function return type, the ``multiple_outputs`` parameter
is automatically set to true.
Note, If you manually set the ``multiple_outputs`` parameter the inference is disabled and
the parameter value is used.
What's Next?
------------
You have seen how simple it is to write DAGs using the Taskflow API paradigm within Airflow 2.0. Please do
read the :ref:`Concepts page<concepts>` for detailed explanation of Airflow concepts such as DAGs, Tasks,
Operators, etc, and the :ref:`concepts:task_decorator` in particular.
More details about the Taskflow API, can be found as part of the Airflow Improvement Proposal
`AIP-31: "Taskflow API" for clearer/simpler DAG definition <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=148638736>`__
and specifically within the Concepts guide at :ref:`Concepts - Taskflow API<concepts:task_flow_api>`.