blob: 4348e75e6d8b757a661452fbe02e17ee90ff0df0 [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Working with TaskFlow
=====================
This tutorial builds on the regular Airflow Tutorial and focuses specifically
on writing data pipelines using the TaskFlow API paradigm which is introduced as
part of Airflow 2.0 and contrasts this with DAGs written using the traditional paradigm.
The data pipeline chosen here is a simple pattern with
three separate Extract, Transform, and Load tasks.
Example "TaskFlow API" Pipeline
-------------------------------
Here is a very simple pipeline using the TaskFlow API paradigm. A more detailed
explanation is given below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]
It's a DAG definition file
--------------------------
If this is the first DAG file you are looking at, please note that this Python script
is interpreted by Airflow and is a configuration file for your data pipeline.
For a complete introduction to DAG files, please look at the core :doc:`fundamentals tutorial<fundamentals>`
which covers DAG structure and definitions extensively.
Instantiate a DAG
-----------------
We are creating a DAG which is the collection of our tasks with dependencies between
the tasks. This is a very simple definition, since we just want the DAG to be run
when we set this up with Airflow, without any retries or complex scheduling.
In this example, please notice that we are creating this DAG using the ``@dag`` decorator
as shown below, with the Python function name acting as the DAG identifier.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py
:language: python
:start-after: [START instantiate_dag]
:end-before: [END instantiate_dag]
Now to actually enable this to be run as a DAG, we invoke the Python function
``tutorial_taskflow_api`` set up using the ``@dag`` decorator earlier, as shown below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py
:language: python
:start-after: [START dag_invocation]
:end-before: [END dag_invocation]
.. versionchanged:: 2.4
It's no longer required to "register" the DAG into a global variable for Airflow to be able to detect the dag if that DAG is used inside a ``with`` block, or if it is the result of a ``@dag`` decorated function.
Tasks
-----
In this data pipeline, tasks are created based on Python functions using the ``@task`` decorator
as shown below. The function name acts as a unique identifier for the task.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py
:language: python
:dedent: 4
:start-after: [START extract]
:end-before: [END extract]
The returned value, which in this case is a dictionary, will be made available for use in later tasks.
The Transform and Load tasks are created in the same manner as the Extract task shown above.
Main flow of the DAG
--------------------
Now that we have the Extract, Transform, and Load tasks defined based on the Python functions,
we can move to the main part of the DAG.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py
:language: python
:dedent: 4
:start-after: [START main_flow]
:end-before: [END main_flow]
That's it, we are done!
We have invoked the Extract task, obtained the order data from there and sent it over to
the Transform task for summarization, and then invoked the Load task with the summarized data.
The dependencies between the tasks and the passing of data between these tasks which could be
running on different workers on different nodes on the network is all handled by Airflow.
Now to actually enable this to be run as a DAG, we invoke the Python function
``tutorial_taskflow_api`` set up using the ``@dag`` decorator earlier, as shown below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py
:language: python
:start-after: [START dag_invocation]
:end-before: [END dag_invocation]
But how?
--------
For experienced Airflow DAG authors, this is startlingly simple! Let's contrast this with
how this DAG had to be written before Airflow 2.0 below:
.. exampleinclude:: /../../airflow/example_dags/tutorial_dag.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]
All of the processing shown above is being done in the new Airflow 2.0 dag as well, but
it is all abstracted from the DAG developer.
Let's examine this in detail by looking at the Transform task in isolation since it is
in the middle of the data pipeline. In Airflow 1.x, this task is defined as shown below:
.. exampleinclude:: /../../airflow/example_dags/tutorial_dag.py
:language: python
:dedent: 4
:start-after: [START transform_function]
:end-before: [END transform_function]
As we see here, the data being processed in the Transform function is passed to it using XCom
variables. In turn, the summarized data from the Transform function is also placed
into another XCom variable which will then be used by the Load task.
Contrasting that with TaskFlow API in Airflow 2.0 as shown below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py
:language: python
:dedent: 4
:start-after: [START transform]
:end-before: [END transform]
All of the XCom usage for data passing between these tasks is abstracted away from the DAG author
in Airflow 2.0. However, XCom variables are used behind the scenes and can be viewed using
the Airflow UI as necessary for debugging or DAG monitoring.
Similarly, task dependencies are automatically generated within TaskFlows based on the
functional invocation of tasks. In Airflow 1.x, tasks had to be explicitly created and
dependencies specified as shown below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_dag.py
:language: python
:dedent: 4
:start-after: [START main_flow]
:end-before: [END main_flow]
In contrast, with the TaskFlow API in Airflow 2.0, the invocation itself automatically generates
the dependencies as shown below.
.. exampleinclude:: /../../airflow/example_dags/tutorial_taskflow_api.py
:language: python
:dedent: 4
:start-after: [START main_flow]
:end-before: [END main_flow]
Reusing a decorated task
-------------------------
Decorated tasks are flexible. You can reuse a decorated task in multiple DAGs, overriding the task
parameters such as the ``task_id``, ``queue``, ``pool``, etc.
Below is an example of how you can reuse a decorated task in multiple DAGs:
.. code-block:: python
from airflow.decorators import task, dag
from datetime import datetime
@task
def add_task(x, y):
print(f"Task args: x={x}, y={y}")
return x + y
@dag(start_date=datetime(2022, 1, 1))
def mydag():
start = add_task.override(task_id="start")(1, 2)
for i in range(3):
start >> add_task.override(task_id=f"add_start_{i}")(start, i)
@dag(start_date=datetime(2022, 1, 1))
def mydag2():
start = add_task(1, 2)
for i in range(3):
start >> add_task.override(task_id=f"new_add_task_{i}")(start, i)
first_dag = mydag()
second_dag = mydag2()
You can also import the above ``add_task`` and use it in another DAG file.
Suppose the ``add_task`` code lives in a file called ``common.py``. You can do this:
.. code-block:: python
from common import add_task
from airflow.decorators import dag
from datetime import datetime
@dag(start_date=datetime(2022, 1, 1))
def use_add_task():
start = add_task.override(priority_weight=3)(1, 2)
for i in range(3):
start >> add_task.override(task_id=f"new_add_task_{i}", retries=4)(start, i)
created_dag = use_add_task()
Using the TaskFlow API with complex/conflicting Python dependencies
-------------------------------------------------------------------
If you have tasks that require complex or conflicting requirements then you will have the ability to use the
TaskFlow API with either Python virtual environment (since 2.0.2), Docker container (since version 2.2.0) or
or ExternalPythonOperator or KubernetesPodOperator (since 2.4.0).
This functionality allows a much more comprehensive range of use-cases for the TaskFlow API,
as you are not limited to the packages and system libraries of the Airflow worker. For all cases of
the decorated functions described below, you have to make sure the functions are serializable and that
they only use local imports for additional dependencies you use. Those imported additional libraries must
be available in the target environment - they do not need to be available in the main Airflow environment.
Which of the operators you should use, depend on several factors:
* whether you are running Airflow with access to Docker engine or Kubernetes
* whether you can afford an overhead to dynamically create a virtual environment with the new dependencies
* whether you can deploy a pre-existing, immutable Python environment for all Airflow components.
These options should allow for far greater flexibility for users who wish to keep their workflows simpler
and more Pythonic - and allow you to keep complete logic of your DAG in the DAG itself.
You can also get more context about the approach of managing conflicting dependencies, including more detailed
explanation on boundaries and consequences of each of the options in
:ref:`Best practices for handling conflicting/complex Python dependencies <best_practices/handling_conflicting_complex_python_dependencies>`
Virtualenv created dynamically for each task
............................................
The simplest approach is to create dynamically (every time a task is run) a separate virtual environment on the
same machine, you can use the ``@task.virtualenv`` decorator. The decorator allows
you to create dynamically a new virtualenv with custom libraries and even a different Python version to
run your function.
.. _taskflow/virtualenv_example:
Example (dynamically created virtualenv):
.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
:language: python
:dedent: 4
:start-after: [START howto_operator_python_venv]
:end-before: [END howto_operator_python_venv]
Using Python environment with pre-installed dependencies
........................................................
A bit more involved ``@task.external_python`` decorator allows you to run an Airflow task in pre-defined,
immutable virtualenv (or Python binary installed at system level without virtualenv).
This virtualenv or system python can also have different set of custom libraries installed and must be
made available in all workers that can execute the tasks in the same location.
.. _taskflow/external_python_example:
Example with ``@task.external_python`` (using immutable, pre-existing virtualenv):
.. exampleinclude:: /../../airflow/example_dags/example_python_operator.py
:language: python
:dedent: 4
:start-after: [START howto_operator_external_python]
:end-before: [END howto_operator_external_python]
Dependency separation using Docker Operator
...........................................
If your Airflow workers have access to a docker engine, you can instead use a ``DockerOperator``
and add any needed arguments to correctly run the task. Please note that the docker
image must have a working Python installed and take in a bash command as the ``command`` argument.
It is worth noting that the Python source code (extracted from the decorated function) and any
callable args are sent to the container via (encoded and pickled) environment variables so the
length of these is not boundless (the exact limit depends on system settings).
Below is an example of using the ``@task.docker`` decorator to run a Python task.
.. _taskflow/docker_example:
.. exampleinclude:: /../../tests/system/providers/docker/example_taskflow_api_docker_virtualenv.py
:language: python
:dedent: 4
:start-after: [START transform_docker]
:end-before: [END transform_docker]
Notes on using the operator:
.. note:: Using ``@task.docker`` decorator in one of the earlier Airflow versions
Since ``@task.docker`` decorator is available in the docker provider, you might be tempted to use it in
Airflow version before 2.2, but this is not going to work. You will get this error if you try:
.. code-block:: text
AttributeError: '_TaskDecorator' object has no attribute 'docker'
You should upgrade to Airflow 2.2 or above in order to use it.
Dependency separation using Kubernetes Pod Operator
...................................................
If your Airflow workers have access to Kubernetes, you can instead use a ``KubernetesPodOperator``
and add any needed arguments to correctly run the task.
Below is an example of using the ``@task.kubernetes`` decorator to run a Python task.
.. _taskflow/kubernetes_example:
.. exampleinclude:: /../../tests/system/providers/cncf/kubernetes/example_kubernetes_decorator.py
:language: python
:dedent: 4
:start-after: [START howto_operator_kubernetes]
:end-before: [END howto_operator_kubernetes]
Notes on using the operator:
.. note:: Using ``@task.kubernetes`` decorator in one of the earlier Airflow versions
Since ``@task.kubernetes`` decorator is available in the docker provider, you might be tempted to use it in
Airflow version before 2.4, but this is not going to work. You will get this error if you try:
.. code-block:: text
AttributeError: '_TaskDecorator' object has no attribute 'kubernetes'
You should upgrade to Airflow 2.4 or above in order to use it.
Multiple outputs inference
--------------------------
Tasks can also infer multiple outputs by using dict Python typing.
.. code-block:: python
@task
def identity_dict(x: int, y: int) -> Dict[str, int]:
return {"x": x, "y": y}
By using the typing ``Dict`` for the function return type, the ``multiple_outputs`` parameter
is automatically set to true.
Note, If you manually set the ``multiple_outputs`` parameter the inference is disabled and
the parameter value is used.
Adding dependencies between decorated and traditional tasks
-----------------------------------------------------------
The above tutorial shows how to create dependencies between TaskFlow functions. However, dependencies can also
be set between traditional tasks (such as :class:`~airflow.operators.bash.BashOperator`
or :class:`~airflow.sensors.filesystem.FileSensor`) and TaskFlow functions.
Building this dependency is shown in the code below:
.. code-block:: python
@task()
def extract_from_file():
"""
#### Extract from file task
A simple Extract task to get data ready for the rest of the data
pipeline, by reading the data from a file into a pandas dataframe
"""
order_data_file = "/tmp/order_data.csv"
order_data_df = pd.read_csv(order_data_file)
file_task = FileSensor(task_id="check_file", filepath="/tmp/order_data.csv")
order_data = extract_from_file()
file_task >> order_data
In the above code block, a new TaskFlow function is defined as ``extract_from_file`` which
reads the data from a known file location.
In the main DAG, a new ``FileSensor`` task is defined to check for this file. Please note
that this is a Sensor task which waits for the file.
Finally, a dependency between this Sensor task and the TaskFlow function is specified.
Consuming XComs between decorated and traditional tasks
-------------------------------------------------------
As noted above, the TaskFlow API allows XComs to be consumed or passed between tasks in a manner that is
abstracted away from the DAG author. This section dives further into detailed examples of how this is
possible not only between TaskFlow functions but between both TaskFlow functions *and* traditional tasks.
You may find it necessary to consume an XCom from traditional tasks, either pushed within the task's execution
or via its return value, as an input into downstream tasks. You can access the pushed XCom (also known as an
``XComArg``) by utilizing the ``.output`` property exposed for all operators.
By default, using the ``.output`` property to retrieve an XCom result is the equivalent of:
.. code-block:: python
task_instance.xcom_pull(task_ids="my_task_id", key="return_value")
To retrieve an XCom result for a key other than ``return_value``, you can use:
.. code-block:: python
my_op = MyOperator(...)
my_op_output = my_op.output["some_other_xcom_key"]
# OR
my_op_output = my_op.output.get("some_other_xcom_key")
.. note::
Using the ``.output`` property as an input to another task is supported only for operator parameters
listed as a ``template_field``.
In the code example below, a :class:`~airflow.providers.http.operators.http.SimpleHttpOperator` result
is captured via :doc:`XComs </concepts/xcoms>`. This XCom result, which is the task output, is then passed
to a TaskFlow function which parses the response as JSON.
.. code-block:: python
get_api_results_task = SimpleHttpOperator(
task_id="get_api_results",
endpoint="/api/query",
do_xcom_push=True,
http_conn_id="http",
)
@task
def parse_results(api_results):
return json.loads(api_results)
parsed_results = parse_results(api_results=get_api_results_task.output)
The reverse can also be done: passing the output of a TaskFlow function as an input to a traditional task.
.. code-block:: python
@task(retries=3)
def create_queue():
"""This is a Python function that creates an SQS queue"""
hook = SqsHook()
result = hook.create_queue(queue_name="sample-queue")
return result["QueueUrl"]
sqs_queue = create_queue()
publish_to_queue = SqsPublishOperator(
task_id="publish_to_queue",
sqs_queue=sqs_queue,
message_content="{{ task_instance }}-{{ execution_date }}",
message_attributes=None,
delay_seconds=0,
)
Take note in the code example above, the output from the ``create_queue`` TaskFlow function, the URL of a
newly-created Amazon SQS Queue, is then passed to a :class:`~airflow.providers.amazon.aws.operators.sqs.SqsPublishOperator`
task as the ``sqs_queue`` arg.
Finally, not only can you use traditional operator outputs as inputs for TaskFlow functions, but also as inputs to
other traditional operators. In the example below, the output from the :class:`~airflow.providers.amazon.aws.transfers.salesforce_to_s3.SalesforceToS3Operator`
task (which is an S3 URI for a destination file location) is used an input for the :class:`~airflow.providers.amazon.aws.operators.s3_copy_object.S3CopyObjectOperator`
task to copy the same file to a date-partitioned storage location in S3 for long-term storage in a data lake.
.. code-block:: python
BASE_PATH = "salesforce/customers"
FILE_NAME = "customer_daily_extract_{{ ds_nodash }}.csv"
upload_salesforce_data_to_s3_landing = SalesforceToS3Operator(
task_id="upload_salesforce_data_to_s3",
salesforce_query="SELECT Id, Name, Company, Phone, Email, LastModifiedDate, IsActive FROM Customers",
s3_bucket_name="landing-bucket",
s3_key=f"{BASE_PATH}/{FILE_NAME}",
salesforce_conn_id="salesforce",
aws_conn_id="s3",
replace=True,
)
store_to_s3_data_lake = S3CopyObjectOperator(
task_id="store_to_s3_data_lake",
aws_conn_id="s3",
source_bucket_key=upload_salesforce_data_to_s3_landing.output,
dest_bucket_name="data_lake",
dest_bucket_key=f"""{BASE_PATH}/{"{{ execution_date.strftime('%Y/%m/%d') }}"}/{FILE_NAME}""",
)
Accessing context variables in decorated tasks
----------------------------------------------
When running your callable, Airflow will pass a set of keyword arguments that can be used in your
function. This set of kwargs correspond exactly to what you can use in your Jinja templates.
For this to work, you need to define ``**kwargs`` in your function header, or you can add directly the
keyword arguments you would like to get - for example with the below code your callable will get
the values of ``ti`` and ``next_ds`` context variables. Note that when explicit keyword arguments are used,
they must be made optional in the function header to avoid ``TypeError`` exceptions during DAG parsing as
these values are not available until task execution.
With explicit arguments:
.. code-block:: python
@task
def my_python_callable(ti=None, next_ds=None):
pass
With kwargs:
.. code-block:: python
@task
def my_python_callable(**kwargs):
ti = kwargs["ti"]
next_ds = kwargs["next_ds"]
Also, sometimes you might want to access the context somewhere deep in the stack, but you do not want to pass
the context variables from the task callable. You can still access execution context via the ``get_current_context``
method.
.. code-block:: python
from airflow.operators.python import get_current_context
def some_function_in_your_library():
context = get_current_context()
ti = context["ti"]
Current context is accessible only during the task execution. The context is not accessible during
``pre_execute`` or ``post_execute``. Calling this method outside execution context will raise an error.
What's Next?
------------
You have seen how simple it is to write DAGs using the TaskFlow API paradigm within Airflow 2.0. Here are a few steps you might want to take next:
.. seealso::
- Continue to the next step of the tutorial: :doc:`/tutorial/pipeline`
- Read the :doc:`Concepts section </concepts/index>` for detailed explanation of Airflow concepts such as DAGs, Tasks, Operators, and more
- View the section on the :doc:`TaskFlow API </concepts/taskflow>` and the ``@task`` decorator.