| .. Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| .. http://www.apache.org/licenses/LICENSE-2.0 |
| |
| .. Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| |
| |
| Pythonic Dags with the TaskFlow API |
| =================================== |
| |
| In the first tutorial, you built your first Airflow Dag using traditional Operators like ``BashOperator``. |
| Now let's look at a more modern and Pythonic way to write workflows using the **TaskFlow API** — introduced in Airflow |
| 2.0. |
| |
| The TaskFlow API is designed to make your code simpler, cleaner, and easier to maintain. You write plain Python |
| functions, decorate them, and Airflow handles the rest — including task creation, dependency wiring, and passing data |
| between tasks. |
| |
| In this tutorial, we'll create a simple ETL pipeline — Extract → Transform → Load using the TaskFlow API. |
| Let's dive in! |
| |
| The Big Picture: A TaskFlow Pipeline |
| ------------------------------------ |
| |
| Here's what the full pipeline looks like using TaskFlow. Don't worry if some of it looks unfamiliar — we'll break it |
| down step-by-step. |
| |
| .. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py |
| :language: python |
| :start-after: [START tutorial] |
| :end-before: [END tutorial] |
| |
| Step 1: Define the Dag |
| ---------------------- |
| |
| Just like before, your Dag is a Python script that Airflow loads and parses. But this time, we're using the ``@dag`` |
| decorator to define it. |
| |
| .. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py |
| :language: python |
| :start-after: [START instantiate_dag] |
| :end-before: [END instantiate_dag] |
| |
| | |
| |
| To make this Dag discoverable by Airflow, we can call the Python function that was decorated with ``@dag``: |
| |
| .. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py |
| :language: python |
| :start-after: [START dag_invocation] |
| :end-before: [END dag_invocation] |
| |
| | |
| |
| .. versionchanged:: 2.4 |
| If you're using the ``@dag`` decorator or defining your Dag in a ``with`` block, you no longer need to assign it to a |
| global variable. Airflow will find it automatically. |
| |
| You can visualize your Dag in the Airflow UI! Once your Dag is loaded, navigate to the Graph View to see how tasks are |
| connected. |
| |
| Step 2: Write Your Tasks with ``@task`` |
| --------------------------------------- |
| |
| With TaskFlow, each task is just a regular Python function. You can use the ``@task`` decorator to turn it into a task |
| that Airflow can schedule and run. Here's the ``extract`` task: |
| |
| .. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py |
| :language: python |
| :dedent: 4 |
| :start-after: [START extract] |
| :end-before: [END extract] |
| |
| | |
| |
| The function's return value is passed to the next task — no manual use of ``XComs`` required. Under the hood, TaskFlow |
| uses ``XComs`` to manage data passing automatically, abstracting away the complexity of manual XCom management from the |
| previous methods. You'll define ``transform`` and ``load`` tasks using the same pattern. |
| |
| Notice the use of ``@task(multiple_outputs=True)`` above — this tells Airflow that the function returns a dictionary of |
| values that should be split into individual XComs. Each key in the returned dictionary becomes its own XCom entry, which |
| makes it easy to reference specific values in downstream tasks. If you omit ``multiple_outputs=True``, the entire |
| dictionary is stored as a single XCom instead, and must be accessed as a whole. |
| |
| Step 3: Build the Flow |
| ---------------------- |
| |
| Once the tasks are defined, you can build the pipeline by simply calling them like Python functions. Airflow uses this |
| functional invocation to set task dependencies and manage data passing. |
| |
| .. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py |
| :language: python |
| :dedent: 4 |
| :start-after: [START main_flow] |
| :end-before: [END main_flow] |
| |
| | |
| |
| That's it! Airflow knows how to schedule and orchestrate your pipeline from this code alone. |
| |
| Running Your Dag |
| ---------------- |
| |
| To enable and trigger your Dag: |
| |
| 1. Navigate to the Airflow UI. |
| 2. Find your Dag in the list and click the toggle to enable it. |
| 3. You can trigger it manually by clicking the "Trigger Dag" button, or wait for it to run on its schedule. |
| |
| What's Happening Behind the Scenes? |
| ----------------------------------- |
| |
| If you've used Airflow 1.x, this probably feels like magic. Let's compare what's happening under the hood. |
| |
| The "Old Way": Manual Wiring and XComs |
| '''''''''''''''''''''''''''''''''''''' |
| |
| Before the TaskFlow API, you had to use Operators like ``PythonOperator`` and pass data manually between tasks using |
| ``XComs``. |
| |
| Here's what the same Dag might have looked like using the traditional approach: |
| |
| .. code-block:: python |
| |
| import json |
| import pendulum |
| from airflow.sdk import DAG |
| from airflow.providers.standard.operators.python import PythonOperator |
| |
| |
| def extract(): |
| # Old way: simulate extracting data from a JSON string |
| data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}' |
| return json.loads(data_string) |
| |
| |
| def transform(ti): |
| # Old way: manually pull from XCom |
| order_data_dict = ti.xcom_pull(task_ids="extract") |
| total_order_value = sum(order_data_dict.values()) |
| return {"total_order_value": total_order_value} |
| |
| |
| def load(ti): |
| # Old way: manually pull from XCom |
| total = ti.xcom_pull(task_ids="transform")["total_order_value"] |
| print(f"Total order value is: {total:.2f}") |
| |
| |
| with DAG( |
| dag_id="legacy_etl_pipeline", |
| schedule=None, |
| start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), |
| catchup=False, |
| tags=["example"], |
| ) as dag: |
| extract_task = PythonOperator(task_id="extract", python_callable=extract) |
| transform_task = PythonOperator(task_id="transform", python_callable=transform) |
| load_task = PythonOperator(task_id="load", python_callable=load) |
| |
| extract_task >> transform_task >> load_task |
| |
| .. note:: |
| This version produces the same result as the TaskFlow API example, but requires explicit management of ``XComs`` and task dependencies. |
| |
| The TaskFlow Way |
| '''''''''''''''' |
| |
| Using TaskFlow, all of this is handled automatically. |
| |
| .. exampleinclude:: /../src/airflow/example_dags/tutorial_taskflow_api.py |
| :language: python |
| :start-after: [START tutorial] |
| :end-before: [END tutorial] |
| |
| | |
| |
| Airflow still uses ``XComs`` and builds a dependency graph — it's just abstracted away so you can focus on your business |
| logic. |
| |
| How XComs Work |
| -------------- |
| |
| TaskFlow return values are stored as ``XComs`` automatically. These values can be inspected in the UI under the "XCom" tab. |
| Manual ``xcom_pull()`` is still possible for traditional operators. |
| |
| |
| Error Handling and Retries |
| --------------------------- |
| |
| You can easily configure retries for your tasks using decorators. For example, you can set a maximum number of retries |
| directly in the task decorator: |
| |
| .. code-block:: python |
| |
| @task(retries=3) |
| def my_task(): ... |
| |
| This helps ensure that transient failures do not lead to task failure. |
| |
| Task Parameterization |
| --------------------- |
| |
| You can reuse decorated tasks in multiple Dags and override parameters like ``task_id`` or ``retries``. |
| |
| .. code-block:: python |
| |
| start = add_task.override(task_id="start")(1, 2) |
| |
| | |
| |
| You can even import decorated tasks from a shared module. |
| |
| What to Explore Next |
| -------------------- |
| |
| Nice work! You've now written your first pipeline using the TaskFlow API. Curious where to go from here? |
| |
| - Add a new task to the Dag -- maybe a filter or validation step |
| - Modify return values and pass multiple outputs |
| - Explore retries and overrides with ``.override(task_id="...")`` |
| - Open the Airflow UI and inspect how the data flows between tasks, including task logs and dependencies |
| |
| .. seealso:: |
| |
| - Continue to the next step: :doc:`/tutorial/pipeline` |
| - Learn more in the :doc:`TaskFlow API docs </core-concepts/taskflow>` or continue below for :ref:`advanced-taskflow-patterns` |
| - Read about Airflow concepts in :doc:`/core-concepts/index` |
| |
| .. _advanced-taskflow-patterns: |
| |
| Advanced TaskFlow Patterns |
| -------------------------- |
| |
| Once you're comfortable with the basics, here are a few powerful techniques you can try. |
| |
| Reusing Decorated Tasks |
| ''''''''''''''''''''''' |
| |
| You can reuse decorated tasks across multiple Dags or Dag runs. This is especially useful for common logic like reusable |
| utilities or shared business rules. Use ``.override()`` to customize task metadata like ``task_id`` or ``retries``. |
| |
| .. code-block:: python |
| |
| start = add_task.override(task_id="start")(1, 2) |
| |
| You can even import decorated tasks from a shared module. |
| |
| Handling Conflicting Dependencies |
| ''''''''''''''''''''''''''''''''' |
| |
| Sometimes tasks require different Python dependencies than the rest of your Dag — for example, specialized libraries or |
| system-level packages. TaskFlow supports multiple execution environments to isolate those dependencies. |
| |
| .. _taskflow-dynamically-created-virtualenv: |
| |
| **Dynamically Created Virtualenv** |
| |
| Creates a temporary virtualenv at task runtime. Great for experimental or dynamic tasks, but may have cold start |
| overhead. |
| |
| .. exampleinclude:: /../../providers/standard/src/airflow/providers/standard/example_dags//example_python_decorator.py |
| :language: python |
| :dedent: 4 |
| :start-after: [START howto_operator_python_venv] |
| :end-before: [END howto_operator_python_venv] |
| |
| | |
| |
| .. _taskflow-external-python-environment: |
| |
| **External Python Environment** |
| |
| Executes the task using a pre-installed Python interpreter — ideal for consistent environments or shared virtualenvs. |
| |
| .. exampleinclude:: /../../providers/standard/src/airflow/providers/standard/example_dags//example_python_decorator.py |
| :language: python |
| :dedent: 4 |
| :start-after: [START howto_operator_external_python] |
| :end-before: [END howto_operator_external_python] |
| |
| | |
| |
| .. _taskflow-docker_environment: |
| |
| **Docker Environment** |
| |
| Runs your task in a Docker container. Useful for packaging everything the task needs — but requires Docker to be |
| available on your worker. |
| |
| .. exampleinclude:: /../../providers/docker/tests/system/docker/example_taskflow_api_docker_virtualenv.py |
| :language: python |
| :dedent: 4 |
| :start-after: [START transform_docker] |
| :end-before: [END transform_docker] |
| |
| | |
| |
| .. note:: Requires Airflow 2.2 and the Docker provider. |
| |
| .. _tasfklow-kpo: |
| |
| **KubernetesPodOperator** |
| |
| Runs your task inside a Kubernetes pod, fully isolated from the main Airflow environment. Ideal for large tasks or tasks |
| requiring custom runtimes. |
| |
| .. exampleinclude:: /../../providers/cncf/kubernetes/tests/system/cncf/kubernetes/example_kubernetes_decorator.py |
| :language: python |
| :dedent: 4 |
| :start-after: [START howto_operator_kubernetes] |
| :end-before: [END howto_operator_kubernetes] |
| |
| | |
| |
| .. note:: Requires Airflow 2.4 and the Kubernetes provider. |
| |
| .. _taskflow-using-sensors: |
| |
| Using Sensors |
| ''''''''''''' |
| |
| Use ``@task.sensor`` to build lightweight, reusable sensors using Python functions. These support both poke and reschedule |
| modes. |
| |
| .. exampleinclude:: /../../providers/standard/src/airflow/providers/standard/example_dags//example_sensor_decorator.py |
| :language: python |
| :start-after: [START tutorial] |
| :end-before: [END tutorial] |
| |
| Mixing with Traditional Tasks |
| ''''''''''''''''''''''''''''' |
| |
| You can combine decorated tasks with classic Operators. This is helpful when using community providers or when migrating |
| incrementally to TaskFlow. |
| |
| You can chain TaskFlow and traditional tasks using ``>>`` or pass data using the ``.output`` attribute. |
| |
| .. _taskflow/accessing_context_variables: |
| |
| Templating in TaskFlow |
| '''''''''''''''''''''' |
| |
| Like traditional tasks, decorated TaskFlow functions support templated arguments — including loading content from files |
| or using runtime parameters. |
| |
| When running your callable, Airflow will pass a set of keyword arguments that |
| can be used in your function. This set of kwargs correspond exactly to what you |
| can use in your Jinja templates. For this to work, you can add context keys you |
| would like to receive in the function as keyword arguments. |
| |
| For example, the callable in the code block below will get values of the ``ti`` |
| and ``next_ds`` context variables: |
| |
| .. code-block:: python |
| |
| @task |
| def my_python_callable(*, ti, next_ds): |
| pass |
| |
| |
| You can also choose to receive the entire context with ``**kwargs``. Note that |
| this can incur a slight performance penalty since Airflow will need to |
| expand the entire context that likely contains many things you don't actually |
| need. It is therefore more recommended for you to use explicit arguments, as |
| demonstrated in the previous paragraph. |
| |
| .. code-block:: python |
| |
| @task |
| def my_python_callable(**kwargs): |
| ti = kwargs["ti"] |
| next_ds = kwargs["next_ds"] |
| |
| Also, sometimes you might want to access the context somewhere deep in the stack, but you do not want to pass |
| the context variables from the task callable. You can still access execution context via the ``get_current_context`` |
| method. |
| |
| .. code-block:: python |
| |
| from airflow.sdk import get_current_context |
| |
| |
| def some_function_in_your_library(): |
| context = get_current_context() |
| ti = context["ti"] |
| |
| |
| Arguments passed to decorated functions are automatically templated. You can also template file using |
| ``templates_exts``: |
| |
| .. code-block:: python |
| |
| @task(templates_exts=[".sql"]) |
| def read_sql(sql): ... |
| |
| |
| Conditional Execution |
| ''''''''''''''''''''' |
| |
| Use ``@task.run_if()`` or ``@task.skip_if()`` to control whether a task runs based on dynamic conditions at runtime — |
| without altering your Dag structure. |
| |
| .. code-block:: python |
| |
| @task.run_if(lambda ctx: ctx["task_instance"].task_id == "run") |
| @task.bash() |
| def echo(): |
| return "echo 'run'" |
| |
| What's Next |
| ----------- |
| |
| Now that you've seen how to build clean, maintainable Dags using the TaskFlow API, here are some good next steps: |
| |
| - Explore asset-aware workflows in :doc:`/authoring-and-scheduling/asset-scheduling` |
| - Dive into scheduling patterns in :ref:`Scheduling Options <scheduling-section>` |
| - Move to the next tutorial: :doc:`/tutorial/pipeline` |