blob: d2071a0682273c3e93af2f6541f3292d7e516b35 [file] [log] [blame]
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Fundamental Concepts
====================
This tutorial walks you through some of the fundamental Airflow concepts,
objects, and their usage while writing your first DAG.
Example Pipeline definition
---------------------------
Here is an example of a basic pipeline definition. Do not worry if this looks
complicated, a line by line explanation follows below.
.. exampleinclude:: /../../airflow/example_dags/tutorial.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]
It's a DAG definition file
--------------------------
One thing to wrap your head around (it may not be very intuitive for everyone
at first) is that this Airflow Python script is really
just a configuration file specifying the DAG's structure as code.
The actual tasks defined here will run in a different context from
the context of this script. Different tasks run on different workers
at different points in time, which means that this script cannot be used
to cross communicate between tasks. Note that for this
purpose we have a more advanced feature called :doc:`/concepts/xcoms`.
People sometimes think of the DAG definition file as a place where they
can do some actual data processing - that is not the case at all!
The script's purpose is to define a DAG object. It needs to evaluate
quickly (seconds, not minutes) since the scheduler will execute it
periodically to reflect the changes if any.
Importing Modules
-----------------
An Airflow pipeline is just a Python script that happens to define an
Airflow DAG object. Let's start by importing the libraries we will need.
.. exampleinclude:: /../../airflow/example_dags/tutorial.py
:language: python
:start-after: [START import_module]
:end-before: [END import_module]
See :doc:`/modules_management` for details on how Python and Airflow manage modules.
Default Arguments
-----------------
We're about to create a DAG and some tasks, and we have the choice to
explicitly pass a set of arguments to each task's constructor
(which would become redundant), or (better!) we can define a dictionary
of default parameters that we can use when creating tasks.
.. exampleinclude:: /../../airflow/example_dags/tutorial.py
:language: python
:dedent: 4
:start-after: [START default_args]
:end-before: [END default_args]
For more information about the BaseOperator's parameters and what they do,
refer to the :py:class:`airflow.models.BaseOperator` documentation.
Also, note that you could easily define different sets of arguments that
would serve different purposes. An example of that would be to have
different settings between a production and development environment.
Instantiate a DAG
-----------------
We'll need a DAG object to nest our tasks into. Here we pass a string
that defines the ``dag_id``, which serves as a unique identifier for your DAG.
We also pass the default argument dictionary that we just defined and
define a ``schedule`` of 1 day for the DAG.
.. exampleinclude:: /../../airflow/example_dags/tutorial.py
:language: python
:start-after: [START instantiate_dag]
:end-before: [END instantiate_dag]
Operators
---------
An operator defines a unit of work for Airflow to complete. Using operators is the classic approach
to defining work in Airflow. For some use cases, it's better to use the TaskFlow API to define
work in a Pythonic context as described in :doc:`taskflow`. For now, using operators helps to
visualize task dependencies in our DAG code.
All operators inherit from the BaseOperator, which includes all of the required arguments for
running work in Airflow. From here, each operator includes unique arguments for
the type of work it's completing. Some of the most popular operators are the PythonOperator, the BashOperator, and the
KubernetesPodOperator.
Airflow completes work based on the arguments you pass to your operators. In this tutorial, we
use the BashOperator to run a few bash scripts.
Tasks
-----
To use an operator in a DAG, you have to instantiate it as a task. Tasks
determine how to execute your operator's work within the context of a DAG.
In the following example, we instantiate the BashOperator as two separate tasks in order to run two
separate bash scripts. The first argument for each instantiation, ``task_id``,
acts as a unique identifier for the task.
.. exampleinclude:: /../../airflow/example_dags/tutorial.py
:language: python
:dedent: 4
:start-after: [START basic_task]
:end-before: [END basic_task]
Notice how we pass a mix of operator specific arguments (``bash_command``) and
an argument common to all operators (``retries``) inherited
from BaseOperator to the operator's constructor. This is simpler than
passing every argument for every constructor call. Also, notice that in
the second task we override the ``retries`` parameter with ``3``.
The precedence rules for a task are as follows:
1. Explicitly passed arguments
2. Values that exist in the ``default_args`` dictionary
3. The operator's default value, if one exists
A task must include or inherit the arguments ``task_id`` and ``owner``,
otherwise Airflow will raise an exception.
Templating with Jinja
---------------------
Airflow leverages the power of
`Jinja Templating <https://jinja.palletsprojects.com/en/2.11.x/>`_ and provides
the pipeline author
with a set of built-in parameters and macros. Airflow also provides
hooks for the pipeline author to define their own parameters, macros and
templates.
This tutorial barely scratches the surface of what you can do with
templating in Airflow, but the goal of this section is to let you know
this feature exists, get you familiar with double curly brackets, and
point to the most common template variable: ``{{ ds }}`` (today's "date
stamp").
.. exampleinclude:: /../../airflow/example_dags/tutorial.py
:language: python
:dedent: 4
:start-after: [START jinja_template]
:end-before: [END jinja_template]
Notice that the ``templated_command`` contains code logic in ``{% %}`` blocks,
references parameters like ``{{ ds }}``, and calls a function as in
``{{ macros.ds_add(ds, 7)}}``.
Files can also be passed to the ``bash_command`` argument, like
``bash_command='templated_command.sh'``, where the file location is relative to
the directory containing the pipeline file (``tutorial.py`` in this case). This
may be desirable for many reasons, like separating your script's logic and
pipeline code, allowing for proper code highlighting in files composed in
different languages, and general flexibility in structuring pipelines. It is
also possible to define your ``template_searchpath`` as pointing to any folder
locations in the DAG constructor call.
Using that same DAG constructor call, it is possible to define
``user_defined_macros`` which allow you to specify your own variables.
For example, passing ``dict(foo='bar')`` to this argument allows you
to use ``{{ foo }}`` in your templates. Moreover, specifying
``user_defined_filters`` allows you to register your own filters. For example,
passing ``dict(hello=lambda name: 'Hello %s' % name)`` to this argument allows
you to use ``{{ 'world' | hello }}`` in your templates. For more information
regarding custom filters have a look at the
`Jinja Documentation <https://jinja.palletsprojects.com/en/latest/api/#custom-filters>`_.
For more information on the variables and macros that can be referenced
in templates, make sure to read through the :ref:`templates-ref`.
Adding DAG and Tasks documentation
----------------------------------
We can add documentation for DAG or each single task. DAG documentation only supports
markdown so far, while task documentation supports plain text, markdown, reStructuredText,
json, and yaml. The DAG documentation can be written as a doc string at the beginning
of the DAG file (recommended), or anywhere else in the file. Below you can find some examples
on how to implement task and DAG docs, as well as screenshots:
.. exampleinclude:: /../../airflow/example_dags/tutorial.py
:language: python
:dedent: 4
:start-after: [START documentation]
:end-before: [END documentation]
.. image:: ../img/task_doc.png
.. image:: ../img/dag_doc.png
Setting up Dependencies
-----------------------
We have tasks ``t1``, ``t2`` and ``t3`` that do not depend on each other. Here's a few ways
you can define dependencies between them:
.. code-block:: python
t1.set_downstream(t2)
# This means that t2 will depend on t1
# running successfully to run.
# It is equivalent to:
t2.set_upstream(t1)
# The bit shift operator can also be
# used to chain operations:
t1 >> t2
# And the upstream dependency with the
# bit shift operator:
t2 << t1
# Chaining multiple dependencies becomes
# concise with the bit shift operator:
t1 >> t2 >> t3
# A list of tasks can also be set as
# dependencies. These operations
# all have the same effect:
t1.set_downstream([t2, t3])
t1 >> [t2, t3]
[t2, t3] << t1
Note that when executing your script, Airflow will raise exceptions when
it finds cycles in your DAG or when a dependency is referenced more
than once.
Using time zones
----------------
Creating a time zone aware DAG is quite simple. Just make sure to supply a time zone aware dates
using ``pendulum``. Don't try to use standard library
`timezone <https://docs.python.org/3/library/datetime.html#timezone-objects>`_ as they are known to
have limitations and we deliberately disallow using them in DAGs.
Recap
-----
Alright, so we have a pretty basic DAG. At this point your code should look
something like this:
.. exampleinclude:: /../../airflow/example_dags/tutorial.py
:language: python
:start-after: [START tutorial]
:end-before: [END tutorial]
.. _testing:
Testing
--------
Running the Script
''''''''''''''''''
Time to run some tests. First, let's make sure the pipeline
is parsed successfully.
Let's assume we are saving the code from the previous step in
``tutorial.py`` in the DAGs folder referenced in your ``airflow.cfg``.
The default location for your DAGs is ``~/airflow/dags``.
.. code-block:: bash
python ~/airflow/dags/tutorial.py
If the script does not raise an exception it means that you have not done
anything horribly wrong, and that your Airflow environment is somewhat
sound.
Command Line Metadata Validation
'''''''''''''''''''''''''''''''''
Let's run a few commands to validate this script further.
.. code-block:: bash
# initialize the database tables
airflow db init
# print the list of active DAGs
airflow dags list
# prints the list of tasks in the "tutorial" DAG
airflow tasks list tutorial
# prints the hierarchy of tasks in the "tutorial" DAG
airflow tasks list tutorial --tree
Testing
'''''''
Let's test by running the actual task instances for a specific date. The date
specified in this context is called the *logical date* (also called *execution
date* for historical reasons), which simulates the scheduler running your task
or DAG for a specific date and time, even though it *physically* will run now
(or as soon as its dependencies are met).
We said the scheduler runs your task *for* a specific date and time, not *at*.
This is because each run of a DAG conceptually represents not a specific date
and time, but an interval between two times, called a
:ref:`data interval <data-interval>`. A DAG run's logical date is the start of
its data interval.
.. code-block:: bash
# command layout: command subcommand [dag_id] [task_id] [(optional) date]
# testing print_date
airflow tasks test tutorial print_date 2015-06-01
# testing sleep
airflow tasks test tutorial sleep 2015-06-01
Now remember what we did with templating earlier? See how this template
gets rendered and executed by running this command:
.. code-block:: bash
# testing templated
airflow tasks test tutorial templated 2015-06-01
This should result in displaying a verbose log of events and ultimately
running your bash command and printing the result.
Note that the ``airflow tasks test`` command runs task instances locally, outputs
their log to stdout (on screen), does not bother with dependencies, and
does not communicate state (running, success, failed, ...) to the database.
It simply allows testing a single task instance.
The same applies to ``airflow dags test``, but on a DAG
level. It performs a single DAG run of the given DAG id. While it does take task
dependencies into account, no state is registered in the database. It is
convenient for locally testing a full run of your DAG, given that e.g. if one of
your tasks expects data at some location, it is available.
Backfill
''''''''
Everything looks like it's running fine so let's run a backfill.
``backfill`` will respect your dependencies, emit logs into files and talk to
the database to record status. If you do have a webserver up, you will be able
to track the progress. ``airflow webserver`` will start a web server if you
are interested in tracking the progress visually as your backfill progresses.
Note that if you use ``depends_on_past=True``, individual task instances
will depend on the success of their previous task instance (that is, previous
according to the logical date). Task instances with their logical dates equal to
``start_date`` will disregard this dependency because there would be no past
task instances created for them.
You may also want to consider ``wait_for_downstream=True`` when using ``depends_on_past=True``.
While ``depends_on_past=True`` causes a task instance to depend on the success
of its previous task_instance, ``wait_for_downstream=True`` will cause a task instance
to also wait for all task instances *immediately downstream* of the previous
task instance to succeed.
The date range in this context is a ``start_date`` and optionally an ``end_date``,
which are used to populate the run schedule with task instances from this dag.
.. code-block:: bash
# optional, start a web server in debug mode in the background
# airflow webserver --debug &
# start your backfill on a date range
airflow dags backfill tutorial \
--start-date 2015-06-01 \
--end-date 2015-06-07
What's Next?
-------------
That's it! You have written, tested and backfilled your very first Airflow
pipeline. Merging your code into a repository that has a master scheduler
running against it should result in being triggered and run every day.
Here are a few things you might want to do next:
.. seealso::
- Continue to the next step of the tutorial: :doc:`/tutorial/taskflow`
- Skip to the the :doc:`/concepts/index` section for detailed explanation of Airflow concepts such as DAGs, Tasks, Operators, and more