| .. Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| .. http://www.apache.org/licenses/LICENSE-2.0 |
| |
| .. Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| |
| |
| Tutorial |
| ================ |
| |
| This tutorial walks you through some of the fundamental Airflow concepts, |
| objects, and their usage while writing your first pipeline. |
| |
| Example Pipeline definition |
| --------------------------- |
| |
| Here is an example of a basic pipeline definition. Do not worry if this looks |
| complicated, a line by line explanation follows below. |
| |
| .. code:: python |
| |
| """ |
| Code that goes along with the Airflow tutorial located at: |
| https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py |
| """ |
| from airflow import DAG |
| from airflow.operators.bash_operator import BashOperator |
| from datetime import datetime, timedelta |
| |
| |
| default_args = { |
| 'owner': 'airflow', |
| 'depends_on_past': False, |
| 'start_date': datetime(2015, 6, 1), |
| 'email': ['airflow@example.com'], |
| 'email_on_failure': False, |
| 'email_on_retry': False, |
| 'retries': 1, |
| 'retry_delay': timedelta(minutes=5), |
| # 'queue': 'bash_queue', |
| # 'pool': 'backfill', |
| # 'priority_weight': 10, |
| # 'end_date': datetime(2016, 1, 1), |
| } |
| |
| dag = DAG('tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) |
| |
| # t1, t2 and t3 are examples of tasks created by instantiating operators |
| t1 = BashOperator( |
| task_id='print_date', |
| bash_command='date', |
| dag=dag) |
| |
| t2 = BashOperator( |
| task_id='sleep', |
| bash_command='sleep 5', |
| retries=3, |
| dag=dag) |
| |
| templated_command = """ |
| {% for i in range(5) %} |
| echo "{{ ds }}" |
| echo "{{ macros.ds_add(ds, 7)}}" |
| echo "{{ params.my_param }}" |
| {% endfor %} |
| """ |
| |
| t3 = BashOperator( |
| task_id='templated', |
| bash_command=templated_command, |
| params={'my_param': 'Parameter I passed in'}, |
| dag=dag) |
| |
| t2.set_upstream(t1) |
| t3.set_upstream(t1) |
| |
| |
| It's a DAG definition file |
| -------------------------- |
| |
| One thing to wrap your head around (it may not be very intuitive for everyone |
| at first) is that this Airflow Python script is really |
| just a configuration file specifying the DAG's structure as code. |
| The actual tasks defined here will run in a different context from |
| the context of this script. Different tasks run on different workers |
| at different points in time, which means that this script cannot be used |
| to cross communicate between tasks. Note that for this |
| purpose we have a more advanced feature called ``XCom``. |
| |
| People sometimes think of the DAG definition file as a place where they |
| can do some actual data processing - that is not the case at all! |
| The script's purpose is to define a DAG object. It needs to evaluate |
| quickly (seconds, not minutes) since the scheduler will execute it |
| periodically to reflect the changes if any. |
| |
| |
| Importing Modules |
| ----------------- |
| |
| An Airflow pipeline is just a Python script that happens to define an |
| Airflow DAG object. Let's start by importing the libraries we will need. |
| |
| .. code:: python |
| |
| # The DAG object; we'll need this to instantiate a DAG |
| from airflow import DAG |
| |
| # Operators; we need this to operate! |
| from airflow.operators.bash_operator import BashOperator |
| |
| Default Arguments |
| ----------------- |
| We're about to create a DAG and some tasks, and we have the choice to |
| explicitly pass a set of arguments to each task's constructor |
| (which would become redundant), or (better!) we can define a dictionary |
| of default parameters that we can use when creating tasks. |
| |
| .. code:: python |
| |
| from datetime import datetime, timedelta |
| |
| default_args = { |
| 'owner': 'airflow', |
| 'depends_on_past': False, |
| 'start_date': datetime(2015, 6, 1), |
| 'email': ['airflow@example.com'], |
| 'email_on_failure': False, |
| 'email_on_retry': False, |
| 'retries': 1, |
| 'retry_delay': timedelta(minutes=5), |
| # 'queue': 'bash_queue', |
| # 'pool': 'backfill', |
| # 'priority_weight': 10, |
| # 'end_date': datetime(2016, 1, 1), |
| } |
| |
| For more information about the BaseOperator's parameters and what they do, |
| refer to the :py:class:`airflow.models.BaseOperator` documentation. |
| |
| Also, note that you could easily define different sets of arguments that |
| would serve different purposes. An example of that would be to have |
| different settings between a production and development environment. |
| |
| |
| Instantiate a DAG |
| ----------------- |
| |
| We'll need a DAG object to nest our tasks into. Here we pass a string |
| that defines the ``dag_id``, which serves as a unique identifier for your DAG. |
| We also pass the default argument dictionary that we just defined and |
| define a ``schedule_interval`` of 1 day for the DAG. |
| |
| .. code:: python |
| |
| dag = DAG( |
| 'tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) |
| |
| Tasks |
| ----- |
| Tasks are generated when instantiating operator objects. An object |
| instantiated from an operator is called a constructor. The first argument |
| ``task_id`` acts as a unique identifier for the task. |
| |
| .. code:: python |
| |
| t1 = BashOperator( |
| task_id='print_date', |
| bash_command='date', |
| dag=dag) |
| |
| t2 = BashOperator( |
| task_id='sleep', |
| bash_command='sleep 5', |
| retries=3, |
| dag=dag) |
| |
| Notice how we pass a mix of operator specific arguments (``bash_command``) and |
| an argument common to all operators (``retries``) inherited |
| from BaseOperator to the operator's constructor. This is simpler than |
| passing every argument for every constructor call. Also, notice that in |
| the second task we override the ``retries`` parameter with ``3``. |
| |
| The precedence rules for a task are as follows: |
| |
| 1. Explicitly passed arguments |
| 2. Values that exist in the ``default_args`` dictionary |
| 3. The operator's default value, if one exists |
| |
| A task must include or inherit the arguments ``task_id`` and ``owner``, |
| otherwise Airflow will raise an exception. |
| |
| Templating with Jinja |
| --------------------- |
| Airflow leverages the power of |
| `Jinja Templating <http://jinja.pocoo.org/docs/dev/>`_ and provides |
| the pipeline author |
| with a set of built-in parameters and macros. Airflow also provides |
| hooks for the pipeline author to define their own parameters, macros and |
| templates. |
| |
| This tutorial barely scratches the surface of what you can do with |
| templating in Airflow, but the goal of this section is to let you know |
| this feature exists, get you familiar with double curly brackets, and |
| point to the most common template variable: ``{{ ds }}`` (today's "date |
| stamp"). |
| |
| .. code:: python |
| |
| templated_command = """ |
| {% for i in range(5) %} |
| echo "{{ ds }}" |
| echo "{{ macros.ds_add(ds, 7) }}" |
| echo "{{ params.my_param }}" |
| {% endfor %} |
| """ |
| |
| t3 = BashOperator( |
| task_id='templated', |
| bash_command=templated_command, |
| params={'my_param': 'Parameter I passed in'}, |
| dag=dag) |
| |
| Notice that the ``templated_command`` contains code logic in ``{% %}`` blocks, |
| references parameters like ``{{ ds }}``, calls a function as in |
| ``{{ macros.ds_add(ds, 7)}}``, and references a user-defined parameter |
| in ``{{ params.my_param }}``. |
| |
| The ``params`` hook in ``BaseOperator`` allows you to pass a dictionary of |
| parameters and/or objects to your templates. Please take the time |
| to understand how the parameter ``my_param`` makes it through to the template. |
| |
| Files can also be passed to the ``bash_command`` argument, like |
| ``bash_command='templated_command.sh'``, where the file location is relative to |
| the directory containing the pipeline file (``tutorial.py`` in this case). This |
| may be desirable for many reasons, like separating your script's logic and |
| pipeline code, allowing for proper code highlighting in files composed in |
| different languages, and general flexibility in structuring pipelines. It is |
| also possible to define your ``template_searchpath`` as pointing to any folder |
| locations in the DAG constructor call. |
| |
| Using that same DAG constructor call, it is possible to define |
| ``user_defined_macros`` which allow you to specify your own variables. |
| For example, passing ``dict(foo='bar')`` to this argument allows you |
| to use ``{{ foo }}`` in your templates. Moreover, specifying |
| ``user_defined_filters`` allow you to register you own filters. For example, |
| passing ``dict(hello=lambda name: 'Hello %s' % name)`` to this argument allows |
| you to use ``{{ 'world' | hello }}`` in your templates. For more information |
| regarding custom filters have a look at the |
| `Jinja Documentation <http://jinja.pocoo.org/docs/dev/api/#writing-filters>`_ |
| |
| For more information on the variables and macros that can be referenced |
| in templates, make sure to read through the :ref:`macros` section |
| |
| Setting up Dependencies |
| ----------------------- |
| We have tasks `t1`, `t2` and `t3` that do not depend on each other. Here's a few ways |
| you can define dependencies between them: |
| |
| .. code:: python |
| |
| t1.set_downstream(t2) |
| |
| # This means that t2 will depend on t1 |
| # running successfully to run. |
| # It is equivalent to: |
| t2.set_upstream(t1) |
| |
| # The bit shift operator can also be |
| # used to chain operations: |
| t1 >> t2 |
| |
| # And the upstream dependency with the |
| # bit shift operator: |
| t2 << t1 |
| |
| # Chaining multiple dependencies becomes |
| # concise with the bit shift operator: |
| t1 >> t2 >> t3 |
| |
| # A list of tasks can also be set as |
| # dependencies. These operations |
| # all have the same effect: |
| t1.set_downstream([t2, t3]) |
| t1 >> [t2, t3] |
| [t2, t3] << t1 |
| |
| Note that when executing your script, Airflow will raise exceptions when |
| it finds cycles in your DAG or when a dependency is referenced more |
| than once. |
| |
| Recap |
| ----- |
| Alright, so we have a pretty basic DAG. At this point your code should look |
| something like this: |
| |
| .. code:: python |
| |
| """ |
| Code that goes along with the Airflow tutorial located at: |
| https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/tutorial.py |
| """ |
| from airflow import DAG |
| from airflow.operators.bash_operator import BashOperator |
| from datetime import datetime, timedelta |
| |
| |
| default_args = { |
| 'owner': 'airflow', |
| 'depends_on_past': False, |
| 'start_date': datetime(2015, 6, 1), |
| 'email': ['airflow@example.com'], |
| 'email_on_failure': False, |
| 'email_on_retry': False, |
| 'retries': 1, |
| 'retry_delay': timedelta(minutes=5), |
| # 'queue': 'bash_queue', |
| # 'pool': 'backfill', |
| # 'priority_weight': 10, |
| # 'end_date': datetime(2016, 1, 1), |
| } |
| |
| dag = DAG( |
| 'tutorial', default_args=default_args, schedule_interval=timedelta(days=1)) |
| |
| # t1, t2 and t3 are examples of tasks created by instantiating operators |
| t1 = BashOperator( |
| task_id='print_date', |
| bash_command='date', |
| dag=dag) |
| |
| t2 = BashOperator( |
| task_id='sleep', |
| bash_command='sleep 5', |
| retries=3, |
| dag=dag) |
| |
| templated_command = """ |
| {% for i in range(5) %} |
| echo "{{ ds }}" |
| echo "{{ macros.ds_add(ds, 7)}}" |
| echo "{{ params.my_param }}" |
| {% endfor %} |
| """ |
| |
| t3 = BashOperator( |
| task_id='templated', |
| bash_command=templated_command, |
| params={'my_param': 'Parameter I passed in'}, |
| dag=dag) |
| |
| t2.set_upstream(t1) |
| t3.set_upstream(t1) |
| |
| Testing |
| -------- |
| |
| Running the Script |
| '''''''''''''''''' |
| |
| Time to run some tests. First let's make sure that the pipeline |
| parses. Let's assume we're saving the code from the previous step in |
| ``tutorial.py`` in the DAGs folder referenced in your ``airflow.cfg``. |
| The default location for your DAGs is ``~/airflow/dags``. |
| |
| .. code-block:: bash |
| |
| python ~/airflow/dags/tutorial.py |
| |
| If the script does not raise an exception it means that you haven't done |
| anything horribly wrong, and that your Airflow environment is somewhat |
| sound. |
| |
| Command Line Metadata Validation |
| ''''''''''''''''''''''''''''''''' |
| Let's run a few commands to validate this script further. |
| |
| .. code-block:: bash |
| |
| # print the list of active DAGs |
| airflow list_dags |
| |
| # prints the list of tasks the "tutorial" dag_id |
| airflow list_tasks tutorial |
| |
| # prints the hierarchy of tasks in the tutorial DAG |
| airflow list_tasks tutorial --tree |
| |
| |
| Testing |
| ''''''' |
| Let's test by running the actual task instances on a specific date. The |
| date specified in this context is an ``execution_date``, which simulates the |
| scheduler running your task or dag at a specific date + time: |
| |
| .. code-block:: bash |
| |
| # command layout: command subcommand dag_id task_id date |
| |
| # testing print_date |
| airflow test tutorial print_date 2015-06-01 |
| |
| # testing sleep |
| airflow test tutorial sleep 2015-06-01 |
| |
| Now remember what we did with templating earlier? See how this template |
| gets rendered and executed by running this command: |
| |
| .. code-block:: bash |
| |
| # testing templated |
| airflow test tutorial templated 2015-06-01 |
| |
| This should result in displaying a verbose log of events and ultimately |
| running your bash command and printing the result. |
| |
| Note that the ``airflow test`` command runs task instances locally, outputs |
| their log to stdout (on screen), doesn't bother with dependencies, and |
| doesn't communicate state (running, success, failed, ...) to the database. |
| It simply allows testing a single task instance. |
| |
| Backfill |
| '''''''' |
| Everything looks like it's running fine so let's run a backfill. |
| ``backfill`` will respect your dependencies, emit logs into files and talk to |
| the database to record status. If you do have a webserver up, you'll be able |
| to track the progress. ``airflow webserver`` will start a web server if you |
| are interested in tracking the progress visually as your backfill progresses. |
| |
| Note that if you use ``depends_on_past=True``, individual task instances |
| will depend on the success of the preceding task instance, except for the |
| start_date specified itself, for which this dependency is disregarded. |
| |
| The date range in this context is a ``start_date`` and optionally an ``end_date``, |
| which are used to populate the run schedule with task instances from this dag. |
| |
| .. code-block:: bash |
| |
| # optional, start a web server in debug mode in the background |
| # airflow webserver --debug & |
| |
| # start your backfill on a date range |
| airflow backfill tutorial -s 2015-06-01 -e 2015-06-07 |
| |
| What's Next? |
| ------------- |
| That's it, you've written, tested and backfilled your very first Airflow |
| pipeline. Merging your code into a code repository that has a master scheduler |
| running against it should get it to get triggered and run every day. |
| |
| Here's a few things you might want to do next: |
| |
| * Take an in-depth tour of the UI - click all the things! |
| * Keep reading the docs! Especially the sections on: |
| |
| * Command line interface |
| * Operators |
| * Macros |
| |
| * Write your first pipeline! |