blob: 84481bffcac81c5ede72ff0d5d78a03f3a961232 [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
.. _sdk-dynamic-task-mapping:
Dynamic Task Mapping with Task SDK
==================================
Dynamic Task Mapping allows tasks defined with the Task SDK to generate
a variable number of task instances at runtime based on upstream data.
This is enabled via the ``expand()`` method on tasks, providing a way
to parallelize execution without knowing the number of tasks ahead of time.
Simple Mapping
--------------
Map over a Python list directly in the DAG:
.. code-block:: python
from datetime import datetime
from airflow.sdk import DAG, task
@task
def add_one(x: int):
return x + 1
@task
def sum_it(values: list[int]):
print(f"Total was {sum(values)}")
with DAG(dag_id="dynamic-map-simple", start_date=datetime(2022, 1, 1)) as dag:
summed = sum_it(values=add_one.expand(x=[1, 2, 3, 4, 5]))
Task-Generated Mapping
----------------------
Generate the list at runtime from an upstream task:
.. code-block:: python
@task
def make_list():
# This could fetch data from an API, database, etc.
return ["a", "b", "c"]
@task
def consume(item: str):
print(item)
with DAG(dag_id="dynamic-map-generated", start_date=datetime(2022, 1, 1)) as dag:
consume.expand(item=make_list())
Details
-----------
- Only keyword arguments can be passed to ``expand()``.
- Mapped inputs are provided to tasks as lazy proxy objects. To force
evaluation into a concrete list, wrap the proxy in ``list()``.
- Combine static parameters with mapped ones using ``partial()``:
.. code-block:: python
@task
def add(x: int, y: int):
return x + y
with DAG(dag_id="map-with-partial", start_date=datetime(2022, 1, 1)) as dag:
add.partial(y=10).expand(x=[1, 2, 3])
Advanced Usage
--------------
For advanced patternssuch as repeated mapping, cross-product mapping,
named mappings (via ``map_index_template``), and handling large
datasetssee the Airflow Core documentation:
`Dynamic Task Mapping in the Airflow Core docs <https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/dynamic-task-mapping.html>`_.