| |
| |
| <!-- |
| Javascript to render AIRFLOW-XXX and PR references in text |
| as HTML links. |
| |
| Overrides extrahead block from sphinx_rtd_theme |
| https://www.sphinx-doc.org/en/master/templating.html |
| --> |
| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>Concepts — Airflow Documentation</title> |
| |
| |
| |
| |
| <link rel="shortcut icon" href="_static/pin_32.png"/> |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="./" src="_static/documentation_options.js"></script> |
| <script type="text/javascript" src="_static/jquery.js"></script> |
| <script type="text/javascript" src="_static/underscore.js"></script> |
| <script type="text/javascript" src="_static/doctools.js"></script> |
| <script type="text/javascript" src="_static/language_data.js"></script> |
| |
| <script type="text/javascript" src="_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="_static/pygments.css" type="text/css" /> |
| <link rel="stylesheet" href="_static/graphviz.css" type="text/css" /> |
| <link rel="index" title="Index" href="genindex.html" /> |
| <link rel="search" title="Search" href="search.html" /> |
| <link rel="next" title="Data Profiling" href="profiling.html" /> |
| <link rel="prev" title="UI / Screenshots" href="ui.html" /> |
| |
| <script> |
| document.addEventListener('DOMContentLoaded', function() { |
| var el = document.getElementById('changelog'); |
| if (el !== null ) { |
| // [AIRFLOW-...] |
| el.innerHTML = el.innerHTML.replace( |
| /\[(AIRFLOW-[\d]+)\]/g, |
| `<a href="https://issues.apache.org/jira/browse/$1">[$1]</a>` |
| ); |
| // (#...) |
| el.innerHTML = el.innerHTML.replace( |
| /\(#([\d]+)\)/g, |
| `<a href="https://github.com/apache/airflow/pull/$1">(#$1)</a>` |
| ); |
| }; |
| }) |
| </script> |
| <script type="text/javascript"> |
| var _gaq = _gaq || []; |
| _gaq.push(['_setAccount', 'UA-140539454-1']); |
| _gaq.push(['_trackPageview']); |
| </script> |
| <style> |
| .example-header { |
| position: relative; |
| background: #9AAA7A; |
| padding: 8px 16px; |
| margin-bottom: 0; |
| } |
| .example-header--with-button { |
| padding-right: 166px; |
| } |
| .example-header:after{ |
| content: ''; |
| display: table; |
| clear: both; |
| } |
| .example-title { |
| display:block; |
| padding: 4px; |
| margin-right: 16px; |
| color: white; |
| overflow-x: auto; |
| } |
| .example-header-button { |
| top: 8px; |
| right: 16px; |
| position: absolute; |
| } |
| .example-header + .highlight-python { |
| margin-top: 0 !important; |
| } |
| .viewcode-button { |
| display: inline-block; |
| padding: 8px 16px; |
| border: 0; |
| margin: 0; |
| outline: 0; |
| border-radius: 2px; |
| -webkit-box-shadow: 0 3px 5px 0 rgba(0,0,0,.3); |
| box-shadow: 0 3px 6px 0 rgba(0,0,0,.3); |
| color: #404040; |
| background-color: #e7e7e7; |
| cursor: pointer; |
| font-size: 16px; |
| font-weight: 500; |
| line-height: 1; |
| text-decoration: none; |
| text-overflow: ellipsis; |
| overflow: hidden; |
| text-transform: uppercase; |
| -webkit-transition: background-color .2s; |
| transition: background-color .2s; |
| vertical-align: middle; |
| white-space: nowrap; |
| } |
| .viewcode-button:visited { |
| color: #404040; |
| } |
| .viewcode-button:hover, .viewcode-button:focus { |
| color: #404040; |
| background-color: #d6d6d6; |
| } |
| </style> |
| |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="index.html" class="icon icon-home"> Airflow |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 1.10.6 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul class="current"> |
| <li class="toctree-l1"><a class="reference internal" href="project.html">Project</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="license.html">License</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="start.html">Quick Start</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="installation.html">Installation</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="tutorial.html">Tutorial</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="howto/index.html">How-to Guides</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="ui.html">UI / Screenshots</a></li> |
| <li class="toctree-l1 current"><a class="current reference internal" href="#">Concepts</a><ul> |
| <li class="toctree-l2"><a class="reference internal" href="#core-ideas">Core Ideas</a><ul> |
| <li class="toctree-l3"><a class="reference internal" href="#dags">DAGs</a><ul> |
| <li class="toctree-l4"><a class="reference internal" href="#scope">Scope</a></li> |
| <li class="toctree-l4"><a class="reference internal" href="#default-arguments">Default Arguments</a></li> |
| <li class="toctree-l4"><a class="reference internal" href="#context-manager">Context Manager</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l3"><a class="reference internal" href="#operators">Operators</a><ul> |
| <li class="toctree-l4"><a class="reference internal" href="#dag-assignment">DAG Assignment</a></li> |
| <li class="toctree-l4"><a class="reference internal" href="#bitshift-composition">Bitshift Composition</a></li> |
| <li class="toctree-l4"><a class="reference internal" href="#relationship-helper">Relationship Helper</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l3"><a class="reference internal" href="#tasks">Tasks</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#task-instances">Task Instances</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#task-lifecycle">Task Lifecycle</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#workflows">Workflows</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l2"><a class="reference internal" href="#additional-functionality">Additional Functionality</a><ul> |
| <li class="toctree-l3"><a class="reference internal" href="#hooks">Hooks</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#pools">Pools</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#connections">Connections</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#queues">Queues</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#xcoms">XComs</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#variables">Variables</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#branching">Branching</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#subdags">SubDAGs</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#slas">SLAs</a><ul> |
| <li class="toctree-l4"><a class="reference internal" href="#email-configuration">Email Configuration</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l3"><a class="reference internal" href="#trigger-rules">Trigger Rules</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#latest-run-only">Latest Run Only</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#zombies-undeads">Zombies & Undeads</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#cluster-policy">Cluster Policy</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#documentation-notes">Documentation & Notes</a></li> |
| <li class="toctree-l3"><a class="reference internal" href="#id1">Jinja Templating</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l2"><a class="reference internal" href="#packaged-dags">Packaged DAGs</a></li> |
| <li class="toctree-l2"><a class="reference internal" href="#airflowignore">.airflowignore</a></li> |
| </ul> |
| </li> |
| <li class="toctree-l1"><a class="reference internal" href="profiling.html">Data Profiling</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="cli.html">Command Line Interface Reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="scheduler.html">Scheduling & Triggers</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="plugins.html">Plugins</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="security.html">Security</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="timezone.html">Time zones</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="api.html">REST API Reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="integration.html">Integration</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="metrics.html">Metrics</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="errors.html">Error Tracking</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="kubernetes.html">Kubernetes</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="lineage.html">Lineage</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="changelog.html">Changelog</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="faq.html">FAQ</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="macros.html">Macros reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="_api/index.html">Python API Reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="privacy_notice.html">Privacy Notice</a></li> |
| </ul> |
| <p class="caption"><span class="caption-text">References</span></p> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="_api/index.html">Python API</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="index.html">Airflow</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="index.html">Docs</a> »</li> |
| |
| <li>Concepts</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| |
| <a href="_sources/concepts.rst.txt" rel="nofollow"> View page source</a> |
| |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <blockquote> |
| <div></div></blockquote> |
| <div class="section" id="concepts"> |
| <h1>Concepts<a class="headerlink" href="#concepts" title="Permalink to this headline">¶</a></h1> |
| <p>The Airflow platform is a tool for describing, executing, and monitoring |
| workflows.</p> |
| <div class="section" id="core-ideas"> |
| <h2>Core Ideas<a class="headerlink" href="#core-ideas" title="Permalink to this headline">¶</a></h2> |
| <div class="section" id="dags"> |
| <h3>DAGs<a class="headerlink" href="#dags" title="Permalink to this headline">¶</a></h3> |
| <p>In Airflow, a <code class="docutils literal notranslate"><span class="pre">DAG</span></code> – or a Directed Acyclic Graph – is a collection of all |
| the tasks you want to run, organized in a way that reflects their relationships |
| and dependencies.</p> |
| <p>For example, a simple DAG could consist of three tasks: A, B, and C. It could |
| say that A has to run successfully before B can run, but C can run anytime. It |
| could say that task A times out after 5 minutes, and B can be restarted up to 5 |
| times in case it fails. It might also say that the workflow will run every night |
| at 10pm, but shouldn’t start until a certain date.</p> |
| <p>In this way, a DAG describes <em>how</em> you want to carry out your workflow; but |
| notice that we haven’t said anything about <em>what</em> we actually want to do! A, B, |
| and C could be anything. Maybe A prepares data for B to analyze while C sends an |
| email. Or perhaps A monitors your location so B can open your garage door while |
| C turns on your house lights. The important thing is that the DAG isn’t |
| concerned with what its constituent tasks do; its job is to make sure that |
| whatever they do happens at the right time, or in the right order, or with the |
| right handling of any unexpected issues.</p> |
| <p>DAGs are defined in standard Python files that are placed in Airflow’s |
| <code class="docutils literal notranslate"><span class="pre">DAG_FOLDER</span></code>. Airflow will execute the code in each file to dynamically build |
| the <code class="docutils literal notranslate"><span class="pre">DAG</span></code> objects. You can have as many DAGs as you want, each describing an |
| arbitrary number of tasks. In general, each one should correspond to a single |
| logical workflow.</p> |
| <div class="admonition note"> |
| <p class="admonition-title">Note</p> |
| <p>When searching for DAGs, Airflow only considers python files |
| that contain the strings “airflow” and “DAG” by default. To consider |
| all python files instead, disable the <code class="docutils literal notranslate"><span class="pre">DAG_DISCOVERY_SAFE_MODE</span></code> |
| configuration flag.</p> |
| </div> |
| <div class="section" id="scope"> |
| <h4>Scope<a class="headerlink" href="#scope" title="Permalink to this headline">¶</a></h4> |
| <p>Airflow will load any <code class="docutils literal notranslate"><span class="pre">DAG</span></code> object it can import from a DAGfile. Critically, |
| that means the DAG must appear in <code class="docutils literal notranslate"><span class="pre">globals()</span></code>. Consider the following two |
| DAGs. Only <code class="docutils literal notranslate"><span class="pre">dag_1</span></code> will be loaded; the other one only appears in a local |
| scope.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">dag_1</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">'this_dag_will_be_discovered'</span><span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">my_function</span><span class="p">():</span> |
| <span class="n">dag_2</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">'but_this_dag_will_not'</span><span class="p">)</span> |
| |
| <span class="n">my_function</span><span class="p">()</span> |
| </pre></div> |
| </div> |
| <p>Sometimes this can be put to good use. For example, a common pattern with |
| <code class="docutils literal notranslate"><span class="pre">SubDagOperator</span></code> is to define the subdag inside a function so that Airflow |
| doesn’t try to load it as a standalone DAG.</p> |
| </div> |
| <div class="section" id="default-arguments"> |
| <h4>Default Arguments<a class="headerlink" href="#default-arguments" title="Permalink to this headline">¶</a></h4> |
| <p>If a dictionary of <code class="docutils literal notranslate"><span class="pre">default_args</span></code> is passed to a DAG, it will apply them to |
| any of its operators. This makes it easy to apply a common parameter to many operators without having to type it many times.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">default_args</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="s1">'start_date'</span><span class="p">:</span> <span class="n">datetime</span><span class="p">(</span><span class="mi">2016</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">),</span> |
| <span class="s1">'owner'</span><span class="p">:</span> <span class="s1">'Airflow'</span> |
| <span class="p">}</span> |
| |
| <span class="n">dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">'my_dag'</span><span class="p">,</span> <span class="n">default_args</span><span class="o">=</span><span class="n">default_args</span><span class="p">)</span> |
| <span class="n">op</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'dummy'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| <span class="k">print</span><span class="p">(</span><span class="n">op</span><span class="o">.</span><span class="n">owner</span><span class="p">)</span> <span class="c1"># Airflow</span> |
| </pre></div> |
| </div> |
| </div> |
| <div class="section" id="context-manager"> |
| <h4>Context Manager<a class="headerlink" href="#context-manager" title="Permalink to this headline">¶</a></h4> |
| <p><em>Added in Airflow 1.8</em></p> |
| <p>DAGs can be used as context managers to automatically assign new operators to that DAG.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">with</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">'my_dag'</span><span class="p">,</span> <span class="n">start_date</span><span class="o">=</span><span class="n">datetime</span><span class="p">(</span><span class="mi">2016</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">))</span> <span class="k">as</span> <span class="n">dag</span><span class="p">:</span> |
| <span class="n">op</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="s1">'op'</span><span class="p">)</span> |
| |
| <span class="n">op</span><span class="o">.</span><span class="n">dag</span> <span class="ow">is</span> <span class="n">dag</span> <span class="c1"># True</span> |
| </pre></div> |
| </div> |
| </div> |
| </div> |
| <div class="section" id="operators"> |
| <span id="concepts-operators"></span><h3>Operators<a class="headerlink" href="#operators" title="Permalink to this headline">¶</a></h3> |
| <p>While DAGs describe <em>how</em> to run a workflow, <code class="docutils literal notranslate"><span class="pre">Operators</span></code> determine what |
| actually gets done.</p> |
| <p>An operator describes a single task in a workflow. Operators are usually (but |
| not always) atomic, meaning they can stand on their own and don’t need to share |
| resources with any other operators. The DAG will make sure that operators run in |
| the correct certain order; other than those dependencies, operators generally |
| run independently. In fact, they may run on two completely different machines.</p> |
| <p>This is a subtle but very important point: in general, if two operators need to |
| share information, like a filename or small amount of data, you should consider |
| combining them into a single operator. If it absolutely can’t be avoided, |
| Airflow does have a feature for operator cross-communication called XCom that is |
| described elsewhere in this document.</p> |
| <p>Airflow provides operators for many common tasks, including:</p> |
| <ul class="simple"> |
| <li><p><a class="reference internal" href="_api/airflow/operators/bash_operator/index.html#airflow.operators.bash_operator.BashOperator" title="airflow.operators.bash_operator.BashOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">BashOperator</span></code></a> - executes a bash command</p></li> |
| <li><p><a class="reference internal" href="_api/airflow/operators/python_operator/index.html#airflow.operators.python_operator.PythonOperator" title="airflow.operators.python_operator.PythonOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">PythonOperator</span></code></a> - calls an arbitrary Python function</p></li> |
| <li><p><a class="reference internal" href="_api/airflow/operators/email_operator/index.html#airflow.operators.email_operator.EmailOperator" title="airflow.operators.email_operator.EmailOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">EmailOperator</span></code></a> - sends an email</p></li> |
| <li><p><a class="reference internal" href="_api/airflow/operators/http_operator/index.html#airflow.operators.http_operator.SimpleHttpOperator" title="airflow.operators.http_operator.SimpleHttpOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">SimpleHttpOperator</span></code></a> - sends an HTTP request</p></li> |
| <li><p><a class="reference internal" href="_api/airflow/operators/mysql_operator/index.html#airflow.operators.mysql_operator.MySqlOperator" title="airflow.operators.mysql_operator.MySqlOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">MySqlOperator</span></code></a>, |
| <a class="reference internal" href="_api/airflow/operators/sqlite_operator/index.html#airflow.operators.sqlite_operator.SqliteOperator" title="airflow.operators.sqlite_operator.SqliteOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">SqliteOperator</span></code></a>, |
| <a class="reference internal" href="_api/airflow/operators/postgres_operator/index.html#airflow.operators.postgres_operator.PostgresOperator" title="airflow.operators.postgres_operator.PostgresOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">PostgresOperator</span></code></a>, |
| <a class="reference internal" href="_api/airflow/operators/mssql_operator/index.html#airflow.operators.mssql_operator.MsSqlOperator" title="airflow.operators.mssql_operator.MsSqlOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">MsSqlOperator</span></code></a>, |
| <a class="reference internal" href="_api/airflow/operators/oracle_operator/index.html#airflow.operators.oracle_operator.OracleOperator" title="airflow.operators.oracle_operator.OracleOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">OracleOperator</span></code></a>, |
| <a class="reference internal" href="_api/airflow/operators/jdbc_operator/index.html#airflow.operators.jdbc_operator.JdbcOperator" title="airflow.operators.jdbc_operator.JdbcOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">JdbcOperator</span></code></a>, etc. - executes a SQL command</p></li> |
| <li><p><code class="docutils literal notranslate"><span class="pre">Sensor</span></code> - waits for a certain time, file, database row, S3 key, etc…</p></li> |
| </ul> |
| <p>In addition to these basic building blocks, there are many more specific |
| operators: <a class="reference internal" href="_api/airflow/operators/docker_operator/index.html#airflow.operators.docker_operator.DockerOperator" title="airflow.operators.docker_operator.DockerOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">DockerOperator</span></code></a>, |
| <a class="reference internal" href="_api/airflow/operators/hive_operator/index.html#airflow.operators.hive_operator.HiveOperator" title="airflow.operators.hive_operator.HiveOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">HiveOperator</span></code></a>, <a class="reference internal" href="_api/airflow/operators/s3_file_transform_operator/index.html#airflow.operators.s3_file_transform_operator.S3FileTransformOperator" title="airflow.operators.s3_file_transform_operator.S3FileTransformOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">S3FileTransformOperator</span></code></a>, |
| <a class="reference internal" href="_api/airflow/operators/presto_to_mysql/index.html#airflow.operators.presto_to_mysql.PrestoToMySqlTransfer" title="airflow.operators.presto_to_mysql.PrestoToMySqlTransfer"><code class="xref py py-class docutils literal notranslate"><span class="pre">PrestoToMySqlTransfer</span></code></a>, |
| <a class="reference internal" href="_api/airflow/operators/slack_operator/index.html#airflow.operators.slack_operator.SlackAPIOperator" title="airflow.operators.slack_operator.SlackAPIOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">SlackAPIOperator</span></code></a>… you get the idea!</p> |
| <p>Operators are only loaded by Airflow if they are assigned to a DAG.</p> |
| <p>See <a class="reference internal" href="howto/operator/index.html"><span class="doc">Using Operators</span></a> for how to use Airflow operators.</p> |
| <div class="section" id="dag-assignment"> |
| <h4>DAG Assignment<a class="headerlink" href="#dag-assignment" title="Permalink to this headline">¶</a></h4> |
| <p><em>Added in Airflow 1.8</em></p> |
| <p>Operators do not have to be assigned to DAGs immediately (previously <code class="docutils literal notranslate"><span class="pre">dag</span></code> was |
| a required argument). However, once an operator is assigned to a DAG, it can not |
| be transferred or unassigned. DAG assignment can be done explicitly when the |
| operator is created, through deferred assignment, or even inferred from other |
| operators.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">'my_dag'</span><span class="p">,</span> <span class="n">start_date</span><span class="o">=</span><span class="n">datetime</span><span class="p">(</span><span class="mi">2016</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">))</span> |
| |
| <span class="c1"># sets the DAG explicitly</span> |
| <span class="n">explicit_op</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'op1'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| |
| <span class="c1"># deferred DAG assignment</span> |
| <span class="n">deferred_op</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'op2'</span><span class="p">)</span> |
| <span class="n">deferred_op</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">dag</span> |
| |
| <span class="c1"># inferred DAG assignment (linked operators must be in the same DAG)</span> |
| <span class="n">inferred_op</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'op3'</span><span class="p">)</span> |
| <span class="n">inferred_op</span><span class="o">.</span><span class="n">set_upstream</span><span class="p">(</span><span class="n">deferred_op</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| </div> |
| <div class="section" id="bitshift-composition"> |
| <h4>Bitshift Composition<a class="headerlink" href="#bitshift-composition" title="Permalink to this headline">¶</a></h4> |
| <p><em>Added in Airflow 1.8</em></p> |
| <p>We recommend you setting operator relationships with bitshift operators rather than <code class="docutils literal notranslate"><span class="pre">set_upstream()</span></code> |
| and <code class="docutils literal notranslate"><span class="pre">set_downstream()</span></code>.</p> |
| <p>Traditionally, operator relationships are set with the <code class="docutils literal notranslate"><span class="pre">set_upstream()</span></code> and |
| <code class="docutils literal notranslate"><span class="pre">set_downstream()</span></code> methods. In Airflow 1.8, this can be done with the Python |
| bitshift operators <code class="docutils literal notranslate"><span class="pre">>></span></code> and <code class="docutils literal notranslate"><span class="pre"><<</span></code>. The following four statements are all |
| functionally equivalent:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">op1</span> <span class="o">>></span> <span class="n">op2</span> |
| <span class="n">op1</span><span class="o">.</span><span class="n">set_downstream</span><span class="p">(</span><span class="n">op2</span><span class="p">)</span> |
| |
| <span class="n">op2</span> <span class="o"><<</span> <span class="n">op1</span> |
| <span class="n">op2</span><span class="o">.</span><span class="n">set_upstream</span><span class="p">(</span><span class="n">op1</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>When using the bitshift to compose operators, the relationship is set in the |
| direction that the bitshift operator points. For example, <code class="docutils literal notranslate"><span class="pre">op1</span> <span class="pre">>></span> <span class="pre">op2</span></code> means |
| that <code class="docutils literal notranslate"><span class="pre">op1</span></code> runs first and <code class="docutils literal notranslate"><span class="pre">op2</span></code> runs second. Multiple operators can be |
| composed – keep in mind the chain is executed left-to-right and the rightmost |
| object is always returned. For example:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">op1</span> <span class="o">>></span> <span class="n">op2</span> <span class="o">>></span> <span class="n">op3</span> <span class="o"><<</span> <span class="n">op4</span> |
| </pre></div> |
| </div> |
| <p>is equivalent to:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">op1</span><span class="o">.</span><span class="n">set_downstream</span><span class="p">(</span><span class="n">op2</span><span class="p">)</span> |
| <span class="n">op2</span><span class="o">.</span><span class="n">set_downstream</span><span class="p">(</span><span class="n">op3</span><span class="p">)</span> |
| <span class="n">op3</span><span class="o">.</span><span class="n">set_upstream</span><span class="p">(</span><span class="n">op4</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>For convenience, the bitshift operators can also be used with DAGs. For example:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">dag</span> <span class="o">>></span> <span class="n">op1</span> <span class="o">>></span> <span class="n">op2</span> |
| </pre></div> |
| </div> |
| <p>is equivalent to:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">op1</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">dag</span> |
| <span class="n">op1</span><span class="o">.</span><span class="n">set_downstream</span><span class="p">(</span><span class="n">op2</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>We can put this all together to build a simple pipeline:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">with</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">'my_dag'</span><span class="p">,</span> <span class="n">start_date</span><span class="o">=</span><span class="n">datetime</span><span class="p">(</span><span class="mi">2016</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">))</span> <span class="k">as</span> <span class="n">dag</span><span class="p">:</span> |
| <span class="p">(</span> |
| <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'dummy_1'</span><span class="p">)</span> |
| <span class="o">>></span> <span class="n">BashOperator</span><span class="p">(</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="s1">'bash_1'</span><span class="p">,</span> |
| <span class="n">bash_command</span><span class="o">=</span><span class="s1">'echo "HELLO!"'</span><span class="p">)</span> |
| <span class="o">>></span> <span class="n">PythonOperator</span><span class="p">(</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="s1">'python_1'</span><span class="p">,</span> |
| <span class="n">python_callable</span><span class="o">=</span><span class="k">lambda</span><span class="p">:</span> <span class="k">print</span><span class="p">(</span><span class="s2">"GOODBYE!"</span><span class="p">))</span> |
| <span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>Bitshift can also be used with lists. For example:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">op1</span> <span class="o">>></span> <span class="p">[</span><span class="n">op2</span><span class="p">,</span> <span class="n">op3</span><span class="p">]</span> <span class="o">>></span> <span class="n">op4</span> |
| </pre></div> |
| </div> |
| <p>is equivalent to:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">op1</span> <span class="o">>></span> <span class="n">op2</span> <span class="o">>></span> <span class="n">op4</span> |
| <span class="n">op1</span> <span class="o">>></span> <span class="n">op3</span> <span class="o">>></span> <span class="n">op4</span> |
| </pre></div> |
| </div> |
| <p>and equivalent to:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">op1</span><span class="o">.</span><span class="n">set_downstream</span><span class="p">([</span><span class="n">op2</span><span class="p">,</span> <span class="n">op3</span><span class="p">])</span> |
| </pre></div> |
| </div> |
| </div> |
| <div class="section" id="relationship-helper"> |
| <h4>Relationship Helper<a class="headerlink" href="#relationship-helper" title="Permalink to this headline">¶</a></h4> |
| <p><code class="docutils literal notranslate"><span class="pre">chain</span></code> and <code class="docutils literal notranslate"><span class="pre">cross_downstream</span></code> function provide easier ways to set relationships |
| between operators in specific situation.</p> |
| <p>When setting relationships between two list of operators and wish all up list |
| operators as upstream to all down list operators, we have to split one list |
| manually using bitshift composition.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="p">[</span><span class="n">op1</span><span class="p">,</span> <span class="n">op2</span><span class="p">,</span> <span class="n">op3</span><span class="p">]</span> <span class="o">>></span> <span class="n">op4</span> |
| <span class="p">[</span><span class="n">op1</span><span class="p">,</span> <span class="n">op2</span><span class="p">,</span> <span class="n">op3</span><span class="p">]</span> <span class="o">>></span> <span class="n">op5</span> |
| <span class="p">[</span><span class="n">op1</span><span class="p">,</span> <span class="n">op2</span><span class="p">,</span> <span class="n">op3</span><span class="p">]</span> <span class="o">>></span> <span class="n">op6</span> |
| </pre></div> |
| </div> |
| <p><code class="docutils literal notranslate"><span class="pre">cross_downstream</span></code> could handle list relationships easier.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">cross_downstream</span><span class="p">([</span><span class="n">op1</span><span class="p">,</span> <span class="n">op2</span><span class="p">,</span> <span class="n">op3</span><span class="p">],</span> <span class="p">[</span><span class="n">op4</span><span class="p">,</span> <span class="n">op5</span><span class="p">,</span> <span class="n">op6</span><span class="p">])</span> |
| </pre></div> |
| </div> |
| <p>When setting single direction relationships to many operators, we could |
| concat them with bitshift composition.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">op1</span> <span class="o">>></span> <span class="n">op2</span> <span class="o">>></span> <span class="n">op3</span> <span class="o">>></span> <span class="n">op4</span> <span class="o">>></span> <span class="n">op5</span> |
| </pre></div> |
| </div> |
| <p>use <code class="docutils literal notranslate"><span class="pre">chain</span></code> could do that</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">chain</span><span class="p">(</span><span class="n">op1</span><span class="p">,</span> <span class="n">op2</span><span class="p">,</span> <span class="n">op3</span><span class="p">,</span> <span class="n">op4</span><span class="p">,</span> <span class="n">op5</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>even without operator’s name</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">chain</span><span class="p">([</span><span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'op'</span> <span class="o">+</span> <span class="n">i</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="mi">1</span><span class="p">,</span> <span class="mi">6</span><span class="p">)])</span> |
| </pre></div> |
| </div> |
| <p><code class="docutils literal notranslate"><span class="pre">chain</span></code> could handle list of operators</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">chain</span><span class="p">(</span><span class="n">op1</span><span class="p">,</span> <span class="p">[</span><span class="n">op2</span><span class="p">,</span> <span class="n">op3</span><span class="p">],</span> <span class="n">op4</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>is equivalent to:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">op1</span> <span class="o">>></span> <span class="p">[</span><span class="n">op2</span><span class="p">,</span> <span class="n">op3</span><span class="p">]</span> <span class="o">>></span> <span class="n">op4</span> |
| </pre></div> |
| </div> |
| <p>Have to same size when <code class="docutils literal notranslate"><span class="pre">chain</span></code> set relationships between two list |
| of operators.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">chain</span><span class="p">(</span><span class="n">op1</span><span class="p">,</span> <span class="p">[</span><span class="n">op2</span><span class="p">,</span> <span class="n">op3</span><span class="p">],</span> <span class="p">[</span><span class="n">op4</span><span class="p">,</span> <span class="n">op5</span><span class="p">],</span> <span class="n">op6</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>is equivalent to:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">op1</span> <span class="o">>></span> <span class="p">[</span><span class="n">op2</span><span class="p">,</span> <span class="n">op3</span><span class="p">]</span> |
| <span class="n">op2</span> <span class="o">>></span> <span class="n">op4</span> |
| <span class="n">op3</span> <span class="o">>></span> <span class="n">op5</span> |
| <span class="p">[</span><span class="n">op4</span><span class="p">,</span> <span class="n">op5</span><span class="p">]</span> <span class="o">>></span> <span class="n">op6</span> |
| </pre></div> |
| </div> |
| </div> |
| </div> |
| <div class="section" id="tasks"> |
| <h3>Tasks<a class="headerlink" href="#tasks" title="Permalink to this headline">¶</a></h3> |
| <p>Once an operator is instantiated, it is referred to as a “task”. The |
| instantiation defines specific values when calling the abstract operator, and |
| the parameterized task becomes a node in a DAG.</p> |
| </div> |
| <div class="section" id="task-instances"> |
| <h3>Task Instances<a class="headerlink" href="#task-instances" title="Permalink to this headline">¶</a></h3> |
| <p>A task instance represents a specific run of a task and is characterized as the |
| combination of a DAG, a task, and a point in time. Task instances also have an |
| indicative state, which could be “running”, “success”, “failed”, “skipped”, “up |
| for retry”, etc.</p> |
| </div> |
| <div class="section" id="task-lifecycle"> |
| <h3>Task Lifecycle<a class="headerlink" href="#task-lifecycle" title="Permalink to this headline">¶</a></h3> |
| <p>A task goes through various stages from start to completion. In the Airflow UI |
| (graph and tree views), these stages are displayed by a color representing each |
| stage:</p> |
| <img alt="_images/task_lifecycle.png" src="_images/task_lifecycle.png" /> |
| <p>The happy flow consists of the following stages:</p> |
| <ol class="arabic simple"> |
| <li><p>no status (scheduler created empty task instance)</p></li> |
| <li><p>queued (scheduler placed a task to run on the queue)</p></li> |
| <li><p>running (worker picked up a task and is now running it)</p></li> |
| <li><p>success (task completed)</p></li> |
| </ol> |
| <p>There is also visual difference between scheduled and manually triggered |
| DAGs/tasks:</p> |
| <img alt="_images/task_manual_vs_scheduled.png" src="_images/task_manual_vs_scheduled.png" /> |
| <p>The DAGs/tasks with a black border are scheduled runs, whereas the non-bordered |
| DAGs/tasks are manually triggered, i.e. by <code class="docutils literal notranslate"><span class="pre">airflow</span> <span class="pre">trigger_dag</span></code>.</p> |
| </div> |
| <div class="section" id="workflows"> |
| <h3>Workflows<a class="headerlink" href="#workflows" title="Permalink to this headline">¶</a></h3> |
| <p>You’re now familiar with the core building blocks of Airflow. |
| Some of the concepts may sound very similar, but the vocabulary can |
| be conceptualized like this:</p> |
| <ul class="simple"> |
| <li><p>DAG: a description of the order in which work should take place</p></li> |
| <li><p>Operator: a class that acts as a template for carrying out some work</p></li> |
| <li><p>Task: a parameterized instance of an operator</p></li> |
| <li><p>Task Instance: a task that 1) has been assigned to a DAG and 2) has a |
| state associated with a specific run of the DAG</p></li> |
| </ul> |
| <p>By combining <code class="docutils literal notranslate"><span class="pre">DAGs</span></code> and <code class="docutils literal notranslate"><span class="pre">Operators</span></code> to create <code class="docutils literal notranslate"><span class="pre">TaskInstances</span></code>, you can |
| build complex workflows.</p> |
| </div> |
| </div> |
| <div class="section" id="additional-functionality"> |
| <h2>Additional Functionality<a class="headerlink" href="#additional-functionality" title="Permalink to this headline">¶</a></h2> |
| <p>In addition to the core Airflow objects, there are a number of more complex |
| features that enable behaviors like limiting simultaneous access to resources, |
| cross-communication, conditional execution, and more.</p> |
| <div class="section" id="hooks"> |
| <h3>Hooks<a class="headerlink" href="#hooks" title="Permalink to this headline">¶</a></h3> |
| <p>Hooks are interfaces to external platforms and databases like Hive, S3, |
| MySQL, Postgres, HDFS, and Pig. Hooks implement a common interface when |
| possible, and act as a building block for operators. They also use |
| the <code class="docutils literal notranslate"><span class="pre">airflow.models.connection.Connection</span></code> model to retrieve hostnames |
| and authentication information. Hooks keep authentication code and |
| information out of pipelines, centralized in the metadata database.</p> |
| <p>Hooks are also very useful on their own to use in Python scripts, |
| Airflow airflow.operators.PythonOperator, and in interactive environments |
| like iPython or Jupyter Notebook.</p> |
| </div> |
| <div class="section" id="pools"> |
| <h3>Pools<a class="headerlink" href="#pools" title="Permalink to this headline">¶</a></h3> |
| <p>Some systems can get overwhelmed when too many processes hit them at the same |
| time. Airflow pools can be used to <strong>limit the execution parallelism</strong> on |
| arbitrary sets of tasks. The list of pools is managed in the UI |
| (<code class="docutils literal notranslate"><span class="pre">Menu</span> <span class="pre">-></span> <span class="pre">Admin</span> <span class="pre">-></span> <span class="pre">Pools</span></code>) by giving the pools a name and assigning |
| it a number of worker slots. Tasks can then be associated with |
| one of the existing pools by using the <code class="docutils literal notranslate"><span class="pre">pool</span></code> parameter when |
| creating tasks (i.e., instantiating operators).</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">aggregate_db_message_job</span> <span class="o">=</span> <span class="n">BashOperator</span><span class="p">(</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="s1">'aggregate_db_message_job'</span><span class="p">,</span> |
| <span class="n">execution_timeout</span><span class="o">=</span><span class="n">timedelta</span><span class="p">(</span><span class="n">hours</span><span class="o">=</span><span class="mi">3</span><span class="p">),</span> |
| <span class="n">pool</span><span class="o">=</span><span class="s1">'ep_data_pipeline_db_msg_agg'</span><span class="p">,</span> |
| <span class="n">bash_command</span><span class="o">=</span><span class="n">aggregate_db_message_job_cmd</span><span class="p">,</span> |
| <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| <span class="n">aggregate_db_message_job</span><span class="o">.</span><span class="n">set_upstream</span><span class="p">(</span><span class="n">wait_for_empty_queue</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>The <code class="docutils literal notranslate"><span class="pre">pool</span></code> parameter can |
| be used in conjunction with <code class="docutils literal notranslate"><span class="pre">priority_weight</span></code> to define priorities |
| in the queue, and which tasks get executed first as slots open up in the |
| pool. The default <code class="docutils literal notranslate"><span class="pre">priority_weight</span></code> is <code class="docutils literal notranslate"><span class="pre">1</span></code>, and can be bumped to any |
| number. When sorting the queue to evaluate which task should be executed |
| next, we use the <code class="docutils literal notranslate"><span class="pre">priority_weight</span></code>, summed up with all of the |
| <code class="docutils literal notranslate"><span class="pre">priority_weight</span></code> values from tasks downstream from this task. You can |
| use this to bump a specific important task and the whole path to that task |
| gets prioritized accordingly.</p> |
| <p>Tasks will be scheduled as usual while the slots fill up. Once capacity is |
| reached, runnable tasks get queued and their state will show as such in the |
| UI. As slots free up, queued tasks start running based on the |
| <code class="docutils literal notranslate"><span class="pre">priority_weight</span></code> (of the task and its descendants).</p> |
| <p>Note that if tasks are not given a pool, they are assigned to a default |
| pool <code class="docutils literal notranslate"><span class="pre">default_pool</span></code>. <code class="docutils literal notranslate"><span class="pre">default_pool</span></code> is initialized with 128 slots and |
| can changed through the UI or CLI (though it cannot be removed).</p> |
| </div> |
| <div class="section" id="connections"> |
| <span id="concepts-connections"></span><h3>Connections<a class="headerlink" href="#connections" title="Permalink to this headline">¶</a></h3> |
| <p>The connection information to external systems is stored in the Airflow |
| metadata database and managed in the UI (<code class="docutils literal notranslate"><span class="pre">Menu</span> <span class="pre">-></span> <span class="pre">Admin</span> <span class="pre">-></span> <span class="pre">Connections</span></code>). |
| A <code class="docutils literal notranslate"><span class="pre">conn_id</span></code> is defined there and hostname / login / password / schema |
| information attached to it. Airflow pipelines can simply refer to the |
| centrally managed <code class="docutils literal notranslate"><span class="pre">conn_id</span></code> without having to hard code any of this |
| information anywhere.</p> |
| <p>Many connections with the same <code class="docutils literal notranslate"><span class="pre">conn_id</span></code> can be defined and when that |
| is the case, and when the <strong>hooks</strong> uses the <code class="docutils literal notranslate"><span class="pre">get_connection</span></code> method |
| from <code class="docutils literal notranslate"><span class="pre">BaseHook</span></code>, Airflow will choose one connection randomly, allowing |
| for some basic load balancing and fault tolerance when used in conjunction |
| with retries.</p> |
| <p>Airflow also has the ability to reference connections via environment |
| variables from the operating system. Then connection parameters must |
| be saved in URI format.</p> |
| <p>If connections with the same <code class="docutils literal notranslate"><span class="pre">conn_id</span></code> are defined in both Airflow metadata |
| database and environment variables, only the one in environment variables |
| will be referenced by Airflow (for example, given <code class="docutils literal notranslate"><span class="pre">conn_id</span></code> |
| <code class="docutils literal notranslate"><span class="pre">postgres_master</span></code>, Airflow will search for <code class="docutils literal notranslate"><span class="pre">AIRFLOW_CONN_POSTGRES_MASTER</span></code> |
| in environment variables first and directly reference it if found, |
| before it starts to search in metadata database).</p> |
| <p>Many hooks have a default <code class="docutils literal notranslate"><span class="pre">conn_id</span></code>, where operators using that hook do not |
| need to supply an explicit connection ID. For example, the default |
| <code class="docutils literal notranslate"><span class="pre">conn_id</span></code> for the <a class="reference internal" href="_api/airflow/hooks/postgres_hook/index.html#airflow.hooks.postgres_hook.PostgresHook" title="airflow.hooks.postgres_hook.PostgresHook"><code class="xref py py-class docutils literal notranslate"><span class="pre">PostgresHook</span></code></a> is |
| <code class="docutils literal notranslate"><span class="pre">postgres_default</span></code>.</p> |
| <p>See <a class="reference internal" href="howto/connection/index.html"><span class="doc">Managing Connections</span></a> for how to create and manage connections.</p> |
| </div> |
| <div class="section" id="queues"> |
| <h3>Queues<a class="headerlink" href="#queues" title="Permalink to this headline">¶</a></h3> |
| <p>When using the CeleryExecutor, the Celery queues that tasks are sent to |
| can be specified. <code class="docutils literal notranslate"><span class="pre">queue</span></code> is an attribute of BaseOperator, so any |
| task can be assigned to any queue. The default queue for the environment |
| is defined in the <code class="docutils literal notranslate"><span class="pre">airflow.cfg</span></code>’s <code class="docutils literal notranslate"><span class="pre">celery</span> <span class="pre">-></span> <span class="pre">default_queue</span></code>. This defines |
| the queue that tasks get assigned to when not specified, as well as which |
| queue Airflow workers listen to when started.</p> |
| <p>Workers can listen to one or multiple queues of tasks. When a worker is |
| started (using the command <code class="docutils literal notranslate"><span class="pre">airflow</span> <span class="pre">worker</span></code>), a set of comma-delimited |
| queue names can be specified (e.g. <code class="docutils literal notranslate"><span class="pre">airflow</span> <span class="pre">worker</span> <span class="pre">-q</span> <span class="pre">spark</span></code>). This worker |
| will then only pick up tasks wired to the specified queue(s).</p> |
| <p>This can be useful if you need specialized workers, either from a |
| resource perspective (for say very lightweight tasks where one worker |
| could take thousands of tasks without a problem), or from an environment |
| perspective (you want a worker running from within the Spark cluster |
| itself because it needs a very specific environment and security rights).</p> |
| </div> |
| <div class="section" id="xcoms"> |
| <span id="concepts-xcom"></span><h3>XComs<a class="headerlink" href="#xcoms" title="Permalink to this headline">¶</a></h3> |
| <p>XComs let tasks exchange messages, allowing more nuanced forms of control and |
| shared state. The name is an abbreviation of “cross-communication”. XComs are |
| principally defined by a key, value, and timestamp, but also track attributes |
| like the task/DAG that created the XCom and when it should become visible. Any |
| object that can be pickled can be used as an XCom value, so users should make |
| sure to use objects of appropriate size.</p> |
| <p>XComs can be “pushed” (sent) or “pulled” (received). When a task pushes an |
| XCom, it makes it generally available to other tasks. Tasks can push XComs at |
| any time by calling the <code class="docutils literal notranslate"><span class="pre">xcom_push()</span></code> method. In addition, if a task returns |
| a value (either from its Operator’s <code class="docutils literal notranslate"><span class="pre">execute()</span></code> method, or from a |
| PythonOperator’s <code class="docutils literal notranslate"><span class="pre">python_callable</span></code> function), then an XCom containing that |
| value is automatically pushed.</p> |
| <p>Tasks call <code class="docutils literal notranslate"><span class="pre">xcom_pull()</span></code> to retrieve XComs, optionally applying filters |
| based on criteria like <code class="docutils literal notranslate"><span class="pre">key</span></code>, source <code class="docutils literal notranslate"><span class="pre">task_ids</span></code>, and source <code class="docutils literal notranslate"><span class="pre">dag_id</span></code>. By |
| default, <code class="docutils literal notranslate"><span class="pre">xcom_pull()</span></code> filters for the keys that are automatically given to |
| XComs when they are pushed by being returned from execute functions (as |
| opposed to XComs that are pushed manually).</p> |
| <p>If <code class="docutils literal notranslate"><span class="pre">xcom_pull</span></code> is passed a single string for <code class="docutils literal notranslate"><span class="pre">task_ids</span></code>, then the most |
| recent XCom value from that task is returned; if a list of <code class="docutils literal notranslate"><span class="pre">task_ids</span></code> is |
| passed, then a corresponding list of XCom values is returned.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1"># inside a PythonOperator called 'pushing_task'</span> |
| <span class="k">def</span> <span class="nf">push_function</span><span class="p">():</span> |
| <span class="k">return</span> <span class="n">value</span> |
| |
| <span class="c1"># inside another PythonOperator where provide_context=True</span> |
| <span class="k">def</span> <span class="nf">pull_function</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">):</span> |
| <span class="n">value</span> <span class="o">=</span> <span class="n">context</span><span class="p">[</span><span class="s1">'task_instance'</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_pull</span><span class="p">(</span><span class="n">task_ids</span><span class="o">=</span><span class="s1">'pushing_task'</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>It is also possible to pull XCom directly in a template, here’s an example |
| of what this may look like:</p> |
| <div class="highlight-jinja notranslate"><div class="highlight"><pre><span></span><span class="x">SELECT * FROM </span><span class="cp">{{</span> <span class="nv">task_instance.xcom_pull</span><span class="o">(</span><span class="nv">task_ids</span><span class="o">=</span><span class="s1">'foo'</span><span class="o">,</span> <span class="nv">key</span><span class="o">=</span><span class="s1">'table_name'</span><span class="o">)</span> <span class="cp">}}</span><span class="x"></span> |
| </pre></div> |
| </div> |
| <p>Note that XComs are similar to <a class="reference internal" href="#variables">Variables</a>, but are specifically designed |
| for inter-task communication rather than global settings.</p> |
| </div> |
| <div class="section" id="variables"> |
| <h3>Variables<a class="headerlink" href="#variables" title="Permalink to this headline">¶</a></h3> |
| <p>Variables are a generic way to store and retrieve arbitrary content or |
| settings as a simple key value store within Airflow. Variables can be |
| listed, created, updated and deleted from the UI (<code class="docutils literal notranslate"><span class="pre">Admin</span> <span class="pre">-></span> <span class="pre">Variables</span></code>), |
| code or CLI. In addition, json settings files can be bulk uploaded through |
| the UI. While your pipeline code definition and most of your constants |
| and variables should be defined in code and stored in source control, |
| it can be useful to have some variables or configuration items |
| accessible and modifiable through the UI.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">airflow.models</span> <span class="kn">import</span> <span class="n">Variable</span> |
| <span class="n">foo</span> <span class="o">=</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"foo"</span><span class="p">)</span> |
| <span class="n">bar</span> <span class="o">=</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"bar"</span><span class="p">,</span> <span class="n">deserialize_json</span><span class="o">=</span><span class="bp">True</span><span class="p">)</span> |
| <span class="n">baz</span> <span class="o">=</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s2">"baz"</span><span class="p">,</span> <span class="n">default_var</span><span class="o">=</span><span class="bp">None</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>The second call assumes <code class="docutils literal notranslate"><span class="pre">json</span></code> content and will be deserialized into |
| <code class="docutils literal notranslate"><span class="pre">bar</span></code>. Note that <code class="docutils literal notranslate"><span class="pre">Variable</span></code> is a sqlalchemy model and can be used |
| as such. The third call uses the <code class="docutils literal notranslate"><span class="pre">default_var</span></code> parameter with the value |
| <code class="docutils literal notranslate"><span class="pre">None</span></code>, which either returns an existing value or <code class="docutils literal notranslate"><span class="pre">None</span></code> if the variable |
| isn’t defined. The get function will throw a <code class="docutils literal notranslate"><span class="pre">KeyError</span></code> if the variable |
| doesn’t exist and no default is provided.</p> |
| <p>You can use a variable from a jinja template with the syntax :</p> |
| <div class="highlight-bash notranslate"><div class="highlight"><pre><span></span><span class="nb">echo</span> <span class="o">{{</span> var.value.<variable_name> <span class="o">}}</span> |
| </pre></div> |
| </div> |
| <p>or if you need to deserialize a json object from the variable :</p> |
| <div class="highlight-bash notranslate"><div class="highlight"><pre><span></span><span class="nb">echo</span> <span class="o">{{</span> var.json.<variable_name> <span class="o">}}</span> |
| </pre></div> |
| </div> |
| </div> |
| <div class="section" id="branching"> |
| <h3>Branching<a class="headerlink" href="#branching" title="Permalink to this headline">¶</a></h3> |
| <p>Sometimes you need a workflow to branch, or only go down a certain path |
| based on an arbitrary condition which is typically related to something |
| that happened in an upstream task. One way to do this is by using the |
| <code class="docutils literal notranslate"><span class="pre">BranchPythonOperator</span></code>.</p> |
| <p>The <code class="docutils literal notranslate"><span class="pre">BranchPythonOperator</span></code> is much like the PythonOperator except that it |
| expects a python_callable that returns a task_id (or list of task_ids). The |
| task_id returned is followed, and all of the other paths are skipped. |
| The task_id returned by the Python function has to be referencing a task |
| directly downstream from the BranchPythonOperator task.</p> |
| <p>Note that using tasks with <code class="docutils literal notranslate"><span class="pre">depends_on_past=True</span></code> downstream from |
| <code class="docutils literal notranslate"><span class="pre">BranchPythonOperator</span></code> is logically unsound as <code class="docutils literal notranslate"><span class="pre">skipped</span></code> status |
| will invariably lead to block tasks that depend on their past successes. |
| <code class="docutils literal notranslate"><span class="pre">skipped</span></code> states propagates where all directly upstream tasks are |
| <code class="docutils literal notranslate"><span class="pre">skipped</span></code>.</p> |
| <p>If you want to skip some tasks, keep in mind that you can’t have an empty |
| path, if so make a dummy task.</p> |
| <p>like this, the dummy task “branch_false” is skipped</p> |
| <img alt="_images/branch_good.png" src="_images/branch_good.png" /> |
| <p>Not like this, where the join task is skipped</p> |
| <img alt="_images/branch_bad.png" src="_images/branch_bad.png" /> |
| <p>The <code class="docutils literal notranslate"><span class="pre">BranchPythonOperator</span></code> can also be used with XComs allowing branching |
| context to dynamically decide what branch to follow based on previous tasks. |
| For example:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">def</span> <span class="nf">branch_func</span><span class="p">(</span><span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="n">ti</span> <span class="o">=</span> <span class="n">kwargs</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span> |
| <span class="n">xcom_value</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">ti</span><span class="o">.</span><span class="n">xcom_pull</span><span class="p">(</span><span class="n">task_ids</span><span class="o">=</span><span class="s1">'start_task'</span><span class="p">))</span> |
| <span class="k">if</span> <span class="n">xcom_value</span> <span class="o">>=</span> <span class="mi">5</span><span class="p">:</span> |
| <span class="k">return</span> <span class="s1">'continue_task'</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="s1">'stop_task'</span> |
| |
| <span class="n">start_op</span> <span class="o">=</span> <span class="n">BashOperator</span><span class="p">(</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="s1">'start_task'</span><span class="p">,</span> |
| <span class="n">bash_command</span><span class="o">=</span><span class="s2">"echo 5"</span><span class="p">,</span> |
| <span class="n">xcom_push</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span> |
| <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| |
| <span class="n">branch_op</span> <span class="o">=</span> <span class="n">BranchPythonOperator</span><span class="p">(</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="s1">'branch_task'</span><span class="p">,</span> |
| <span class="n">provide_context</span><span class="o">=</span><span class="bp">True</span><span class="p">,</span> |
| <span class="n">python_callable</span><span class="o">=</span><span class="n">branch_func</span><span class="p">,</span> |
| <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| |
| <span class="n">continue_op</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'continue_task'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| <span class="n">stop_op</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'stop_task'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| |
| <span class="n">start_op</span> <span class="o">>></span> <span class="n">branch_op</span> <span class="o">>></span> <span class="p">[</span><span class="n">continue_op</span><span class="p">,</span> <span class="n">stop_op</span><span class="p">]</span> |
| </pre></div> |
| </div> |
| <p>If you wish to implement your own operators with branching functionality, you |
| can inherit from <a class="reference internal" href="_api/airflow/operators/branch_operator/index.html#airflow.operators.branch_operator.BaseBranchOperator" title="airflow.operators.branch_operator.BaseBranchOperator"><code class="xref py py-class docutils literal notranslate"><span class="pre">BaseBranchOperator</span></code></a>, |
| which behaves similarly to <code class="docutils literal notranslate"><span class="pre">BranchPythonOperator</span></code> but expects you to provide |
| an implementation of the method <code class="docutils literal notranslate"><span class="pre">choose_branch</span></code>. As with the callable for |
| <code class="docutils literal notranslate"><span class="pre">BranchPythonOperator</span></code>, this method should return the ID of a downstream task, |
| or a list of task IDs, which will be run, and all others will be skipped.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">class</span> <span class="nc">MyBranchOperator</span><span class="p">(</span><span class="n">BaseBranchOperator</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">choose_branch</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Run an extra branch on the first day of the month</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">context</span><span class="p">[</span><span class="s1">'execution_date'</span><span class="p">]</span><span class="o">.</span><span class="n">day</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span> |
| <span class="k">return</span> <span class="p">[</span><span class="s1">'daily_task_id'</span><span class="p">,</span> <span class="s1">'monthly_task_id'</span><span class="p">]</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="s1">'daily_task_id'</span> |
| </pre></div> |
| </div> |
| </div> |
| <div class="section" id="subdags"> |
| <h3>SubDAGs<a class="headerlink" href="#subdags" title="Permalink to this headline">¶</a></h3> |
| <p>SubDAGs are perfect for repeating patterns. Defining a function that returns a |
| DAG object is a nice design pattern when using Airflow.</p> |
| <p>Airbnb uses the <em>stage-check-exchange</em> pattern when loading data. Data is staged |
| in a temporary table, after which data quality checks are performed against |
| that table. Once the checks all pass the partition is moved into the production |
| table.</p> |
| <p>As another example, consider the following DAG:</p> |
| <img alt="_images/subdag_before.png" src="_images/subdag_before.png" /> |
| <p>We can combine all of the parallel <code class="docutils literal notranslate"><span class="pre">task-*</span></code> operators into a single SubDAG, |
| so that the resulting DAG resembles the following:</p> |
| <img alt="_images/subdag_after.png" src="_images/subdag_after.png" /> |
| <p>Note that SubDAG operators should contain a factory method that returns a DAG |
| object. This will prevent the SubDAG from being treated like a separate DAG in |
| the main UI. For example:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1">#dags/subdag.py</span> |
| <span class="kn">from</span> <span class="nn">airflow.models</span> <span class="kn">import</span> <span class="n">DAG</span> |
| <span class="kn">from</span> <span class="nn">airflow.operators.dummy_operator</span> <span class="kn">import</span> <span class="n">DummyOperator</span> |
| |
| |
| <span class="c1"># Dag is returned by a factory method</span> |
| <span class="k">def</span> <span class="nf">sub_dag</span><span class="p">(</span><span class="n">parent_dag_name</span><span class="p">,</span> <span class="n">child_dag_name</span><span class="p">,</span> <span class="n">start_date</span><span class="p">,</span> <span class="n">schedule_interval</span><span class="p">):</span> |
| <span class="n">dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span> |
| <span class="s1">'</span><span class="si">%s</span><span class="s1">.</span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="p">(</span><span class="n">parent_dag_name</span><span class="p">,</span> <span class="n">child_dag_name</span><span class="p">),</span> |
| <span class="n">schedule_interval</span><span class="o">=</span><span class="n">schedule_interval</span><span class="p">,</span> |
| <span class="n">start_date</span><span class="o">=</span><span class="n">start_date</span><span class="p">,</span> |
| <span class="p">)</span> |
| |
| <span class="n">dummy_operator</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="s1">'dummy_task'</span><span class="p">,</span> |
| <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">,</span> |
| <span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">dag</span> |
| </pre></div> |
| </div> |
| <p>This SubDAG can then be referenced in your main DAG file:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1"># main_dag.py</span> |
| <span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">datetime</span><span class="p">,</span> <span class="n">timedelta</span> |
| <span class="kn">from</span> <span class="nn">airflow.models</span> <span class="kn">import</span> <span class="n">DAG</span> |
| <span class="kn">from</span> <span class="nn">airflow.operators.subdag_operator</span> <span class="kn">import</span> <span class="n">SubDagOperator</span> |
| <span class="kn">from</span> <span class="nn">dags.subdag</span> <span class="kn">import</span> <span class="n">sub_dag</span> |
| |
| |
| <span class="n">PARENT_DAG_NAME</span> <span class="o">=</span> <span class="s1">'parent_dag'</span> |
| <span class="n">CHILD_DAG_NAME</span> <span class="o">=</span> <span class="s1">'child_dag'</span> |
| |
| <span class="n">main_dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span> |
| <span class="n">dag_id</span><span class="o">=</span><span class="n">PARENT_DAG_NAME</span><span class="p">,</span> |
| <span class="n">schedule_interval</span><span class="o">=</span><span class="n">timedelta</span><span class="p">(</span><span class="n">hours</span><span class="o">=</span><span class="mi">1</span><span class="p">),</span> |
| <span class="n">start_date</span><span class="o">=</span><span class="n">datetime</span><span class="p">(</span><span class="mi">2016</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span> |
| <span class="p">)</span> |
| |
| <span class="n">sub_dag</span> <span class="o">=</span> <span class="n">SubDagOperator</span><span class="p">(</span> |
| <span class="n">subdag</span><span class="o">=</span><span class="n">sub_dag</span><span class="p">(</span><span class="n">PARENT_DAG_NAME</span><span class="p">,</span> <span class="n">CHILD_DAG_NAME</span><span class="p">,</span> <span class="n">main_dag</span><span class="o">.</span><span class="n">start_date</span><span class="p">,</span> |
| <span class="n">main_dag</span><span class="o">.</span><span class="n">schedule_interval</span><span class="p">),</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="n">CHILD_DAG_NAME</span><span class="p">,</span> |
| <span class="n">dag</span><span class="o">=</span><span class="n">main_dag</span><span class="p">,</span> |
| <span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>You can zoom into a SubDagOperator from the graph view of the main DAG to show |
| the tasks contained within the SubDAG:</p> |
| <img alt="_images/subdag_zoom.png" src="_images/subdag_zoom.png" /> |
| <p>Some other tips when using SubDAGs:</p> |
| <ul class="simple"> |
| <li><p>by convention, a SubDAG’s <code class="docutils literal notranslate"><span class="pre">dag_id</span></code> should be prefixed by its parent and |
| a dot. As in <code class="docutils literal notranslate"><span class="pre">parent.child</span></code></p></li> |
| <li><p>share arguments between the main DAG and the SubDAG by passing arguments to |
| the SubDAG operator (as demonstrated above)</p></li> |
| <li><p>SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is |
| set to <code class="docutils literal notranslate"><span class="pre">None</span></code> or <code class="docutils literal notranslate"><span class="pre">@once</span></code>, the SubDAG will succeed without having done |
| anything</p></li> |
| <li><p>clearing a SubDagOperator also clears the state of the tasks within</p></li> |
| <li><p>marking success on a SubDagOperator does not affect the state of the tasks |
| within</p></li> |
| <li><p>refrain from using <code class="docutils literal notranslate"><span class="pre">depends_on_past=True</span></code> in tasks within the SubDAG as |
| this can be confusing</p></li> |
| <li><p>it is possible to specify an executor for the SubDAG. It is common to use |
| the SequentialExecutor if you want to run the SubDAG in-process and |
| effectively limit its parallelism to one. Using LocalExecutor can be |
| problematic as it may over-subscribe your worker, running multiple tasks in |
| a single slot</p></li> |
| </ul> |
| <p>See <code class="docutils literal notranslate"><span class="pre">airflow/example_dags</span></code> for a demonstration.</p> |
| </div> |
| <div class="section" id="slas"> |
| <h3>SLAs<a class="headerlink" href="#slas" title="Permalink to this headline">¶</a></h3> |
| <p>Service Level Agreements, or time by which a task or DAG should have |
| succeeded, can be set at a task level as a <code class="docutils literal notranslate"><span class="pre">timedelta</span></code>. If |
| one or many instances have not succeeded by that time, an alert email is sent |
| detailing the list of tasks that missed their SLA. The event is also recorded |
| in the database and made available in the web UI under <code class="docutils literal notranslate"><span class="pre">Browse->SLA</span> <span class="pre">Misses</span></code> |
| where events can be analyzed and documented.</p> |
| <p>SLAs can be configured for scheduled tasks by using the <code class="docutils literal notranslate"><span class="pre">sla</span></code> parameter. |
| In addition to sending alerts to the addresses specified in a task’s <code class="docutils literal notranslate"><span class="pre">email</span></code> parameter, |
| the <code class="docutils literal notranslate"><span class="pre">sla_miss_callback</span></code> specifies an additional <code class="docutils literal notranslate"><span class="pre">Callable</span></code> |
| object to be invoked when the SLA is not met.</p> |
| <div class="section" id="email-configuration"> |
| <h4>Email Configuration<a class="headerlink" href="#email-configuration" title="Permalink to this headline">¶</a></h4> |
| <p>You can configure the email that is being sent in your <code class="docutils literal notranslate"><span class="pre">airflow.cfg</span></code> |
| by setting a <code class="docutils literal notranslate"><span class="pre">subject_template</span></code> and/or a <code class="docutils literal notranslate"><span class="pre">html_content_template</span></code> |
| in the <code class="docutils literal notranslate"><span class="pre">email</span></code> section.</p> |
| <div class="highlight-default notranslate"><div class="highlight"><pre><span></span><span class="p">[</span><span class="n">email</span><span class="p">]</span> |
| |
| <span class="n">email_backend</span> <span class="o">=</span> <span class="n">airflow</span><span class="o">.</span><span class="n">utils</span><span class="o">.</span><span class="n">email</span><span class="o">.</span><span class="n">send_email_smtp</span> |
| |
| <span class="n">subject_template</span> <span class="o">=</span> <span class="o">/</span><span class="n">path</span><span class="o">/</span><span class="n">to</span><span class="o">/</span><span class="n">my_subject_template_file</span> |
| <span class="n">html_content_template</span> <span class="o">=</span> <span class="o">/</span><span class="n">path</span><span class="o">/</span><span class="n">to</span><span class="o">/</span><span class="n">my_html_content_template_file</span> |
| </pre></div> |
| </div> |
| <p>To access the task’s information you use <a class="reference external" href="http://jinja.pocoo.org/docs/dev/">Jinja Templating</a> in your template files.</p> |
| <p>For example a <code class="docutils literal notranslate"><span class="pre">html_content_template</span></code> file could look like this:</p> |
| <div class="highlight-default notranslate"><div class="highlight"><pre><span></span><span class="n">Try</span> <span class="p">{{</span><span class="n">try_number</span><span class="p">}}</span> <span class="n">out</span> <span class="n">of</span> <span class="p">{{</span><span class="n">max_tries</span> <span class="o">+</span> <span class="mi">1</span><span class="p">}}</span><span class="o"><</span><span class="n">br</span><span class="o">></span> |
| <span class="ne">Exception</span><span class="p">:</span><span class="o"><</span><span class="n">br</span><span class="o">></span><span class="p">{{</span><span class="n">exception_html</span><span class="p">}}</span><span class="o"><</span><span class="n">br</span><span class="o">></span> |
| <span class="n">Log</span><span class="p">:</span> <span class="o"><</span><span class="n">a</span> <span class="n">href</span><span class="o">=</span><span class="s2">"{{ti.log_url}}"</span><span class="o">></span><span class="n">Link</span><span class="o"></</span><span class="n">a</span><span class="o">><</span><span class="n">br</span><span class="o">></span> |
| <span class="n">Host</span><span class="p">:</span> <span class="p">{{</span><span class="n">ti</span><span class="o">.</span><span class="n">hostname</span><span class="p">}}</span><span class="o"><</span><span class="n">br</span><span class="o">></span> |
| <span class="n">Log</span> <span class="n">file</span><span class="p">:</span> <span class="p">{{</span><span class="n">ti</span><span class="o">.</span><span class="n">log_filepath</span><span class="p">}}</span><span class="o"><</span><span class="n">br</span><span class="o">></span> |
| <span class="n">Mark</span> <span class="n">success</span><span class="p">:</span> <span class="o"><</span><span class="n">a</span> <span class="n">href</span><span class="o">=</span><span class="s2">"{{ti.mark_success_url}}"</span><span class="o">></span><span class="n">Link</span><span class="o"></</span><span class="n">a</span><span class="o">><</span><span class="n">br</span><span class="o">></span> |
| </pre></div> |
| </div> |
| </div> |
| </div> |
| <div class="section" id="trigger-rules"> |
| <h3>Trigger Rules<a class="headerlink" href="#trigger-rules" title="Permalink to this headline">¶</a></h3> |
| <p>Though the normal workflow behavior is to trigger tasks when all their |
| directly upstream tasks have succeeded, Airflow allows for more complex |
| dependency settings.</p> |
| <p>All operators have a <code class="docutils literal notranslate"><span class="pre">trigger_rule</span></code> argument which defines the rule by which |
| the generated task get triggered. The default value for <code class="docutils literal notranslate"><span class="pre">trigger_rule</span></code> is |
| <code class="docutils literal notranslate"><span class="pre">all_success</span></code> and can be defined as “trigger this task when all directly |
| upstream tasks have succeeded”. All other rules described here are based |
| on direct parent tasks and are values that can be passed to any operator |
| while creating tasks:</p> |
| <ul class="simple"> |
| <li><p><code class="docutils literal notranslate"><span class="pre">all_success</span></code>: (default) all parents have succeeded</p></li> |
| <li><p><code class="docutils literal notranslate"><span class="pre">all_failed</span></code>: all parents are in a <code class="docutils literal notranslate"><span class="pre">failed</span></code> or <code class="docutils literal notranslate"><span class="pre">upstream_failed</span></code> state</p></li> |
| <li><p><code class="docutils literal notranslate"><span class="pre">all_done</span></code>: all parents are done with their execution</p></li> |
| <li><p><code class="docutils literal notranslate"><span class="pre">one_failed</span></code>: fires as soon as at least one parent has failed, it does not wait for all parents to be done</p></li> |
| <li><p><code class="docutils literal notranslate"><span class="pre">one_success</span></code>: fires as soon as at least one parent succeeds, it does not wait for all parents to be done</p></li> |
| <li><p><code class="docutils literal notranslate"><span class="pre">none_failed</span></code>: all parents have not failed (<code class="docutils literal notranslate"><span class="pre">failed</span></code> or <code class="docutils literal notranslate"><span class="pre">upstream_failed</span></code>) i.e. all parents have succeeded or been skipped</p></li> |
| <li><p><code class="docutils literal notranslate"><span class="pre">none_skipped</span></code>: no parent is in a <code class="docutils literal notranslate"><span class="pre">skipped</span></code> state, i.e. all parents are in a <code class="docutils literal notranslate"><span class="pre">success</span></code>, <code class="docutils literal notranslate"><span class="pre">failed</span></code>, or <code class="docutils literal notranslate"><span class="pre">upstream_failed</span></code> state</p></li> |
| <li><p><code class="docutils literal notranslate"><span class="pre">dummy</span></code>: dependencies are just for show, trigger at will</p></li> |
| </ul> |
| <p>Note that these can be used in conjunction with <code class="docutils literal notranslate"><span class="pre">depends_on_past</span></code> (boolean) |
| that, when set to <code class="docutils literal notranslate"><span class="pre">True</span></code>, keeps a task from getting triggered if the |
| previous schedule for the task hasn’t succeeded.</p> |
| <p>One must be aware of the interaction between trigger rules and skipped tasks |
| in schedule level. Skipped tasks will cascade through trigger rules |
| <code class="docutils literal notranslate"><span class="pre">all_success</span></code> and <code class="docutils literal notranslate"><span class="pre">all_failed</span></code> but not <code class="docutils literal notranslate"><span class="pre">all_done</span></code>, <code class="docutils literal notranslate"><span class="pre">one_failed</span></code>, <code class="docutils literal notranslate"><span class="pre">one_success</span></code>, |
| <code class="docutils literal notranslate"><span class="pre">none_failed</span></code>, <code class="docutils literal notranslate"><span class="pre">none_skipped</span></code> and <code class="docutils literal notranslate"><span class="pre">dummy</span></code>.</p> |
| <p>For example, consider the following DAG:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1">#dags/branch_without_trigger.py</span> |
| <span class="kn">import</span> <span class="nn">datetime</span> <span class="kn">as</span> <span class="nn">dt</span> |
| |
| <span class="kn">from</span> <span class="nn">airflow.models</span> <span class="kn">import</span> <span class="n">DAG</span> |
| <span class="kn">from</span> <span class="nn">airflow.operators.dummy_operator</span> <span class="kn">import</span> <span class="n">DummyOperator</span> |
| <span class="kn">from</span> <span class="nn">airflow.operators.python_operator</span> <span class="kn">import</span> <span class="n">BranchPythonOperator</span> |
| |
| <span class="n">dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span> |
| <span class="n">dag_id</span><span class="o">=</span><span class="s1">'branch_without_trigger'</span><span class="p">,</span> |
| <span class="n">schedule_interval</span><span class="o">=</span><span class="s1">'@once'</span><span class="p">,</span> |
| <span class="n">start_date</span><span class="o">=</span><span class="n">dt</span><span class="o">.</span><span class="n">datetime</span><span class="p">(</span><span class="mi">2019</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">28</span><span class="p">)</span> |
| <span class="p">)</span> |
| |
| <span class="n">run_this_first</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'run_this_first'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| <span class="n">branching</span> <span class="o">=</span> <span class="n">BranchPythonOperator</span><span class="p">(</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="s1">'branching'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">,</span> |
| <span class="n">python_callable</span><span class="o">=</span><span class="k">lambda</span><span class="p">:</span> <span class="s1">'branch_a'</span> |
| <span class="p">)</span> |
| |
| <span class="n">branch_a</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'branch_a'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| <span class="n">follow_branch_a</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'follow_branch_a'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| |
| <span class="n">branch_false</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'branch_false'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| |
| <span class="n">join</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'join'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| |
| <span class="n">run_this_first</span> <span class="o">>></span> <span class="n">branching</span> |
| <span class="n">branching</span> <span class="o">>></span> <span class="n">branch_a</span> <span class="o">>></span> <span class="n">follow_branch_a</span> <span class="o">>></span> <span class="n">join</span> |
| <span class="n">branching</span> <span class="o">>></span> <span class="n">branch_false</span> <span class="o">>></span> <span class="n">join</span> |
| </pre></div> |
| </div> |
| <p>In the case of this DAG, <code class="docutils literal notranslate"><span class="pre">join</span></code> is downstream of <code class="docutils literal notranslate"><span class="pre">follow_branch_a</span></code> |
| and <code class="docutils literal notranslate"><span class="pre">branch_false</span></code>. The <code class="docutils literal notranslate"><span class="pre">join</span></code> task will show up as skipped |
| because its <code class="docutils literal notranslate"><span class="pre">trigger_rule</span></code> is set to <code class="docutils literal notranslate"><span class="pre">all_success</span></code> by default and |
| skipped tasks will cascade through <code class="docutils literal notranslate"><span class="pre">all_success</span></code>.</p> |
| <img alt="_images/branch_without_trigger.png" src="_images/branch_without_trigger.png" /> |
| <p>By setting <code class="docutils literal notranslate"><span class="pre">trigger_rule</span></code> to <code class="docutils literal notranslate"><span class="pre">none_failed</span></code> in <code class="docutils literal notranslate"><span class="pre">join</span></code> task,</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1">#dags/branch_with_trigger.py</span> |
| <span class="o">...</span> |
| <span class="n">join</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'join'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">,</span> <span class="n">trigger_rule</span><span class="o">=</span><span class="s1">'none_failed'</span><span class="p">)</span> |
| <span class="o">...</span> |
| </pre></div> |
| </div> |
| <p>The <code class="docutils literal notranslate"><span class="pre">join</span></code> task will be triggered as soon as |
| <code class="docutils literal notranslate"><span class="pre">branch_false</span></code> has been skipped (a valid completion state) and |
| <code class="docutils literal notranslate"><span class="pre">follow_branch_a</span></code> has succeeded. Because skipped tasks <strong>will not</strong> |
| cascade through <code class="docutils literal notranslate"><span class="pre">none_failed</span></code>.</p> |
| <img alt="_images/branch_with_trigger.png" src="_images/branch_with_trigger.png" /> |
| </div> |
| <div class="section" id="latest-run-only"> |
| <h3>Latest Run Only<a class="headerlink" href="#latest-run-only" title="Permalink to this headline">¶</a></h3> |
| <p>Standard workflow behavior involves running a series of tasks for a |
| particular date/time range. Some workflows, however, perform tasks that |
| are independent of run time but need to be run on a schedule, much like a |
| standard cron job. In these cases, backfills or running jobs missed during |
| a pause just wastes CPU cycles.</p> |
| <p>For situations like this, you can use the <code class="docutils literal notranslate"><span class="pre">LatestOnlyOperator</span></code> to skip |
| tasks that are not being run during the most recent scheduled run for a |
| DAG. The <code class="docutils literal notranslate"><span class="pre">LatestOnlyOperator</span></code> skips all downstream tasks, if the time |
| right now is not between its <code class="docutils literal notranslate"><span class="pre">execution_time</span></code> and the next scheduled |
| <code class="docutils literal notranslate"><span class="pre">execution_time</span></code>.</p> |
| <p>For example, consider the following DAG:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1">#dags/latest_only_with_trigger.py</span> |
| <span class="kn">import</span> <span class="nn">datetime</span> <span class="kn">as</span> <span class="nn">dt</span> |
| |
| <span class="kn">from</span> <span class="nn">airflow.models</span> <span class="kn">import</span> <span class="n">DAG</span> |
| <span class="kn">from</span> <span class="nn">airflow.operators.dummy_operator</span> <span class="kn">import</span> <span class="n">DummyOperator</span> |
| <span class="kn">from</span> <span class="nn">airflow.operators.latest_only_operator</span> <span class="kn">import</span> <span class="n">LatestOnlyOperator</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.trigger_rule</span> <span class="kn">import</span> <span class="n">TriggerRule</span> |
| |
| |
| <span class="n">dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span> |
| <span class="n">dag_id</span><span class="o">=</span><span class="s1">'latest_only_with_trigger'</span><span class="p">,</span> |
| <span class="n">schedule_interval</span><span class="o">=</span><span class="n">dt</span><span class="o">.</span><span class="n">timedelta</span><span class="p">(</span><span class="n">hours</span><span class="o">=</span><span class="mi">1</span><span class="p">),</span> |
| <span class="n">start_date</span><span class="o">=</span><span class="n">dt</span><span class="o">.</span><span class="n">datetime</span><span class="p">(</span><span class="mi">2019</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">28</span><span class="p">),</span> |
| <span class="p">)</span> |
| |
| <span class="n">latest_only</span> <span class="o">=</span> <span class="n">LatestOnlyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'latest_only'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| |
| <span class="n">task1</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'task1'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| <span class="n">task1</span><span class="o">.</span><span class="n">set_upstream</span><span class="p">(</span><span class="n">latest_only</span><span class="p">)</span> |
| |
| <span class="n">task2</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'task2'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| |
| <span class="n">task3</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'task3'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| <span class="n">task3</span><span class="o">.</span><span class="n">set_upstream</span><span class="p">([</span><span class="n">task1</span><span class="p">,</span> <span class="n">task2</span><span class="p">])</span> |
| |
| <span class="n">task4</span> <span class="o">=</span> <span class="n">DummyOperator</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="s1">'task4'</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">,</span> |
| <span class="n">trigger_rule</span><span class="o">=</span><span class="n">TriggerRule</span><span class="o">.</span><span class="n">ALL_DONE</span><span class="p">)</span> |
| <span class="n">task4</span><span class="o">.</span><span class="n">set_upstream</span><span class="p">([</span><span class="n">task1</span><span class="p">,</span> <span class="n">task2</span><span class="p">])</span> |
| </pre></div> |
| </div> |
| <p>In the case of this DAG, the <code class="docutils literal notranslate"><span class="pre">latest_only</span></code> task will show up as skipped |
| for all runs except the latest run. <code class="docutils literal notranslate"><span class="pre">task1</span></code> is directly downstream of |
| <code class="docutils literal notranslate"><span class="pre">latest_only</span></code> and will also skip for all runs except the latest. |
| <code class="docutils literal notranslate"><span class="pre">task2</span></code> is entirely independent of <code class="docutils literal notranslate"><span class="pre">latest_only</span></code> and will run in all |
| scheduled periods. <code class="docutils literal notranslate"><span class="pre">task3</span></code> is downstream of <code class="docutils literal notranslate"><span class="pre">task1</span></code> and <code class="docutils literal notranslate"><span class="pre">task2</span></code> and |
| because of the default <code class="docutils literal notranslate"><span class="pre">trigger_rule</span></code> being <code class="docutils literal notranslate"><span class="pre">all_success</span></code> will receive |
| a cascaded skip from <code class="docutils literal notranslate"><span class="pre">task1</span></code>. <code class="docutils literal notranslate"><span class="pre">task4</span></code> is downstream of <code class="docutils literal notranslate"><span class="pre">task1</span></code> and |
| <code class="docutils literal notranslate"><span class="pre">task2</span></code>. It will be first skipped directly by <code class="docutils literal notranslate"><span class="pre">LatestOnlyOperator</span></code>, |
| even its <code class="docutils literal notranslate"><span class="pre">trigger_rule</span></code> is set to <code class="docutils literal notranslate"><span class="pre">all_done</span></code>.</p> |
| <img alt="_images/latest_only_with_trigger.png" src="_images/latest_only_with_trigger.png" /> |
| </div> |
| <div class="section" id="zombies-undeads"> |
| <h3>Zombies & Undeads<a class="headerlink" href="#zombies-undeads" title="Permalink to this headline">¶</a></h3> |
| <p>Task instances die all the time, usually as part of their normal life cycle, |
| but sometimes unexpectedly.</p> |
| <p>Zombie tasks are characterized by the absence |
| of an heartbeat (emitted by the job periodically) and a <code class="docutils literal notranslate"><span class="pre">running</span></code> status |
| in the database. They can occur when a worker node can’t reach the database, |
| when Airflow processes are killed externally, or when a node gets rebooted |
| for instance. Zombie killing is performed periodically by the scheduler’s |
| process.</p> |
| <p>Undead processes are characterized by the existence of a process and a matching |
| heartbeat, but Airflow isn’t aware of this task as <code class="docutils literal notranslate"><span class="pre">running</span></code> in the database. |
| This mismatch typically occurs as the state of the database is altered, |
| most likely by deleting rows in the “Task Instances” view in the UI. |
| Tasks are instructed to verify their state as part of the heartbeat routine, |
| and terminate themselves upon figuring out that they are in this “undead” |
| state.</p> |
| </div> |
| <div class="section" id="cluster-policy"> |
| <h3>Cluster Policy<a class="headerlink" href="#cluster-policy" title="Permalink to this headline">¶</a></h3> |
| <p>Your local Airflow settings file can define a <code class="docutils literal notranslate"><span class="pre">policy</span></code> function that |
| has the ability to mutate task attributes based on other task or DAG |
| attributes. It receives a single argument as a reference to task objects, |
| and is expected to alter its attributes.</p> |
| <p>For example, this function could apply a specific queue property when |
| using a specific operator, or enforce a task timeout policy, making sure |
| that no tasks run for more than 48 hours. Here’s an example of what this |
| may look like inside your <code class="docutils literal notranslate"><span class="pre">airflow_settings.py</span></code>:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">def</span> <span class="nf">policy</span><span class="p">(</span><span class="n">task</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">==</span> <span class="s1">'HivePartitionSensor'</span><span class="p">:</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">queue</span> <span class="o">=</span> <span class="s2">"sensor_queue"</span> |
| <span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">timeout</span> <span class="o">></span> <span class="n">timedelta</span><span class="p">(</span><span class="n">hours</span><span class="o">=</span><span class="mi">48</span><span class="p">):</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">timeout</span> <span class="o">=</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">hours</span><span class="o">=</span><span class="mi">48</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| </div> |
| <div class="section" id="documentation-notes"> |
| <h3>Documentation & Notes<a class="headerlink" href="#documentation-notes" title="Permalink to this headline">¶</a></h3> |
| <p>It’s possible to add documentation or notes to your DAGs & task objects that |
| become visible in the web interface (“Graph View” for DAGs, “Task Details” for |
| tasks). There are a set of special task attributes that get rendered as rich |
| content if defined:</p> |
| <table class="docutils align-default"> |
| <colgroup> |
| <col style="width: 38%" /> |
| <col style="width: 62%" /> |
| </colgroup> |
| <thead> |
| <tr class="row-odd"><th class="head"><p>attribute</p></th> |
| <th class="head"><p>rendered to</p></th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr class="row-even"><td><p>doc</p></td> |
| <td><p>monospace</p></td> |
| </tr> |
| <tr class="row-odd"><td><p>doc_json</p></td> |
| <td><p>json</p></td> |
| </tr> |
| <tr class="row-even"><td><p>doc_yaml</p></td> |
| <td><p>yaml</p></td> |
| </tr> |
| <tr class="row-odd"><td><p>doc_md</p></td> |
| <td><p>markdown</p></td> |
| </tr> |
| <tr class="row-even"><td><p>doc_rst</p></td> |
| <td><p>reStructuredText</p></td> |
| </tr> |
| </tbody> |
| </table> |
| <p>Please note that for DAGs, doc_md is the only attribute interpreted.</p> |
| <p>This is especially useful if your tasks are built dynamically from |
| configuration files, it allows you to expose the configuration that led |
| to the related tasks in Airflow.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="sd">"""</span> |
| <span class="sd">### My great DAG</span> |
| <span class="sd">"""</span> |
| |
| <span class="n">dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span><span class="s1">'my_dag'</span><span class="p">,</span> <span class="n">default_args</span><span class="o">=</span><span class="n">default_args</span><span class="p">)</span> |
| <span class="n">dag</span><span class="o">.</span><span class="n">doc_md</span> <span class="o">=</span> <span class="vm">__doc__</span> |
| |
| <span class="n">t</span> <span class="o">=</span> <span class="n">BashOperator</span><span class="p">(</span><span class="s2">"foo"</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| <span class="n">t</span><span class="o">.</span><span class="n">doc_md</span> <span class="o">=</span> <span class="s2">"""</span><span class="se">\</span> |
| <span class="s2">#Title"</span> |
| <span class="s2">Here's a [url](www.airbnb.com)</span> |
| <span class="s2">"""</span> |
| </pre></div> |
| </div> |
| <p>This content will get rendered as markdown respectively in the “Graph View” and |
| “Task Details” pages.</p> |
| </div> |
| <div class="section" id="id1"> |
| <span id="id2"></span><h3>Jinja Templating<a class="headerlink" href="#id1" title="Permalink to this headline">¶</a></h3> |
| <p>Airflow leverages the power of |
| <a class="reference external" href="http://jinja.pocoo.org/docs/dev/">Jinja Templating</a> and this can be a |
| powerful tool to use in combination with macros (see the <a class="reference internal" href="macros.html"><span class="doc">Macros reference</span></a> section).</p> |
| <p>For example, say you want to pass the execution date as an environment variable |
| to a Bash script using the <code class="docutils literal notranslate"><span class="pre">BashOperator</span></code>.</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="c1"># The execution date as YYYY-MM-DD</span> |
| <span class="n">date</span> <span class="o">=</span> <span class="s2">"{{ ds }}"</span> |
| <span class="n">t</span> <span class="o">=</span> <span class="n">BashOperator</span><span class="p">(</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="s1">'test_env'</span><span class="p">,</span> |
| <span class="n">bash_command</span><span class="o">=</span><span class="s1">'/tmp/test.sh '</span><span class="p">,</span> |
| <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">,</span> |
| <span class="n">env</span><span class="o">=</span><span class="p">{</span><span class="s1">'EXECUTION_DATE'</span><span class="p">:</span> <span class="n">date</span><span class="p">})</span> |
| </pre></div> |
| </div> |
| <p>Here, <code class="docutils literal notranslate"><span class="pre">{{</span> <span class="pre">ds</span> <span class="pre">}}</span></code> is a macro, and because the <code class="docutils literal notranslate"><span class="pre">env</span></code> parameter of the |
| <code class="docutils literal notranslate"><span class="pre">BashOperator</span></code> is templated with Jinja, the execution date will be available |
| as an environment variable named <code class="docutils literal notranslate"><span class="pre">EXECUTION_DATE</span></code> in your Bash script.</p> |
| <p>You can use Jinja templating with every parameter that is marked as “templated” |
| in the documentation. Template substitution occurs just before the pre_execute |
| function of your operator is called.</p> |
| <p>You can also use Jinja templating with nested fields, as long as these nested fields |
| are marked as templated in the structure they belong to: fields registered in |
| <code class="docutils literal notranslate"><span class="pre">template_fields</span></code> property will be submitted to template substitution, like the |
| <code class="docutils literal notranslate"><span class="pre">path</span></code> field in the example below:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">class</span> <span class="nc">MyDataReader</span><span class="p">:</span> |
| <span class="n">template_fields</span> <span class="o">=</span> <span class="p">[</span><span class="s1">'path'</span><span class="p">]</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">my_path</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">path</span> <span class="o">=</span> <span class="n">my_path</span> |
| |
| <span class="c1"># [additional code here...]</span> |
| |
| <span class="n">t</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="s1">'transform_data'</span><span class="p">,</span> |
| <span class="n">python_callable</span><span class="o">=</span><span class="n">transform_data</span> |
| <span class="n">op_args</span><span class="o">=</span><span class="p">[</span> |
| <span class="n">MyDataReader</span><span class="p">(</span><span class="s1">'/tmp/{{ ds }}/my_file'</span><span class="p">)</span> |
| <span class="p">],</span> |
| <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <div class="admonition note"> |
| <p class="admonition-title">Note</p> |
| <p><code class="docutils literal notranslate"><span class="pre">template_fields</span></code> property can equally be a class variable or an |
| instance variable.</p> |
| </div> |
| <p>Deep nested fields can also be substituted, as long as all intermediate fields are |
| marked as template fields:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="k">class</span> <span class="nc">MyDataTransformer</span><span class="p">:</span> |
| <span class="n">template_fields</span> <span class="o">=</span> <span class="p">[</span><span class="s1">'reader'</span><span class="p">]</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">my_reader</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">reader</span> <span class="o">=</span> <span class="n">my_reader</span> |
| |
| <span class="c1"># [additional code here...]</span> |
| |
| <span class="k">class</span> <span class="nc">MyDataReader</span><span class="p">:</span> |
| <span class="n">template_fields</span> <span class="o">=</span> <span class="p">[</span><span class="s1">'path'</span><span class="p">]</span> |
| |
| <span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">my_path</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">path</span> <span class="o">=</span> <span class="n">my_path</span> |
| |
| <span class="c1"># [additional code here...]</span> |
| |
| <span class="n">t</span> <span class="o">=</span> <span class="n">PythonOperator</span><span class="p">(</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="s1">'transform_data'</span><span class="p">,</span> |
| <span class="n">python_callable</span><span class="o">=</span><span class="n">transform_data</span> |
| <span class="n">op_args</span><span class="o">=</span><span class="p">[</span> |
| <span class="n">MyDataTransformer</span><span class="p">(</span><span class="n">MyDataReader</span><span class="p">(</span><span class="s1">'/tmp/{{ ds }}/my_file'</span><span class="p">))</span> |
| <span class="p">],</span> |
| <span class="n">dag</span><span class="o">=</span><span class="n">dag</span><span class="p">)</span> |
| </pre></div> |
| </div> |
| <p>You can pass custom options to the Jinja <code class="docutils literal notranslate"><span class="pre">Environment</span></code> when creating your DAG. |
| One common usage is to avoid Jinja from dropping a trailing newline from a |
| template string:</p> |
| <div class="highlight-python notranslate"><div class="highlight"><pre><span></span><span class="n">my_dag</span> <span class="o">=</span> <span class="n">DAG</span><span class="p">(</span><span class="n">dag_id</span><span class="o">=</span><span class="s1">'my-dag'</span><span class="p">,</span> |
| <span class="n">jinja_environment_kwargs</span><span class="o">=</span><span class="p">{</span> |
| <span class="s1">'keep_trailing_newline'</span><span class="p">:</span> <span class="bp">True</span><span class="p">,</span> |
| <span class="c1"># some other jinja2 Environment options here</span> |
| <span class="p">})</span> |
| </pre></div> |
| </div> |
| <p>See <a class="reference external" href="https://jinja.palletsprojects.com/en/master/api/#jinja2.Environment">Jinja documentation</a> |
| to find all available options.</p> |
| </div> |
| </div> |
| <div class="section" id="packaged-dags"> |
| <h2>Packaged DAGs<a class="headerlink" href="#packaged-dags" title="Permalink to this headline">¶</a></h2> |
| <p>While often you will specify DAGs in a single <code class="docutils literal notranslate"><span class="pre">.py</span></code> file it might sometimes |
| be required to combine a DAG and its dependencies. For example, you might want |
| to combine several DAGs together to version them together or you might want |
| to manage them together or you might need an extra module that is not available |
| by default on the system you are running Airflow on. To allow this you can create |
| a zip file that contains the DAG(s) in the root of the zip file and have the extra |
| modules unpacked in directories.</p> |
| <p>For instance you can create a zip file that looks like this:</p> |
| <div class="highlight-bash notranslate"><div class="highlight"><pre><span></span>my_dag1.py |
| my_dag2.py |
| package1/__init__.py |
| package1/functions.py |
| </pre></div> |
| </div> |
| <p>Airflow will scan the zip file and try to load <code class="docutils literal notranslate"><span class="pre">my_dag1.py</span></code> and <code class="docutils literal notranslate"><span class="pre">my_dag2.py</span></code>. |
| It will not go into subdirectories as these are considered to be potential |
| packages.</p> |
| <p>In case you would like to add module dependencies to your DAG you basically would |
| do the same, but then it is more suitable to use a virtualenv and pip.</p> |
| <div class="highlight-bash notranslate"><div class="highlight"><pre><span></span>virtualenv zip_dag |
| <span class="nb">source</span> zip_dag/bin/activate |
| |
| mkdir zip_dag_contents |
| <span class="nb">cd</span> zip_dag_contents |
| |
| pip install --install-option<span class="o">=</span><span class="s2">"--install-lib=</span><span class="nv">$PWD</span><span class="s2">"</span> my_useful_package |
| cp ~/my_dag.py . |
| |
| zip -r zip_dag.zip * |
| </pre></div> |
| </div> |
| <div class="admonition note"> |
| <p class="admonition-title">Note</p> |
| <p>the zip file will be inserted at the beginning of module search list |
| (sys.path) and as such it will be available to any other code that resides |
| within the same interpreter.</p> |
| </div> |
| <div class="admonition note"> |
| <p class="admonition-title">Note</p> |
| <p>packaged dags cannot be used with pickling turned on.</p> |
| </div> |
| <div class="admonition note"> |
| <p class="admonition-title">Note</p> |
| <p>packaged dags cannot contain dynamic libraries (eg. libz.so) these need |
| to be available on the system if a module needs those. In other words only |
| pure python modules can be packaged.</p> |
| </div> |
| </div> |
| <div class="section" id="airflowignore"> |
| <h2>.airflowignore<a class="headerlink" href="#airflowignore" title="Permalink to this headline">¶</a></h2> |
| <p>A <code class="docutils literal notranslate"><span class="pre">.airflowignore</span></code> file specifies the directories or files in <code class="docutils literal notranslate"><span class="pre">DAG_FOLDER</span></code> |
| that Airflow should intentionally ignore. Each line in <code class="docutils literal notranslate"><span class="pre">.airflowignore</span></code> |
| specifies a regular expression pattern, and directories or files whose names |
| (not DAG id) match any of the patterns would be ignored (under the hood, |
| <code class="docutils literal notranslate"><span class="pre">re.findall()</span></code> is used to match the pattern). Overall it works like a |
| <code class="docutils literal notranslate"><span class="pre">.gitignore</span></code> file. Use the <code class="docutils literal notranslate"><span class="pre">#</span></code> character to indicate a comment; all |
| characters on a line following a <code class="docutils literal notranslate"><span class="pre">#</span></code> will be ignored.</p> |
| <p><code class="docutils literal notranslate"><span class="pre">.airflowignore</span></code> file should be put in your <code class="docutils literal notranslate"><span class="pre">DAG_FOLDER</span></code>. |
| For example, you can prepare a <code class="docutils literal notranslate"><span class="pre">.airflowignore</span></code> file with contents</p> |
| <div class="highlight-default notranslate"><div class="highlight"><pre><span></span><span class="n">project_a</span> |
| <span class="n">tenant_</span><span class="p">[</span>\<span class="n">d</span><span class="p">]</span> |
| </pre></div> |
| </div> |
| <p>Then files like “project_a_dag_1.py”, “TESTING_project_a.py”, “tenant_1.py”, |
| “project_a/dag_1.py”, and “tenant_1/dag_1.py” in your <code class="docutils literal notranslate"><span class="pre">DAG_FOLDER</span></code> would be ignored |
| (If a directory’s name matches any of the patterns, this directory and all its subfolders |
| would not be scanned by Airflow at all. This improves efficiency of DAG finding).</p> |
| <p>The scope of a <code class="docutils literal notranslate"><span class="pre">.airflowignore</span></code> file is the directory it is in plus all its subfolders. |
| You can also prepare <code class="docutils literal notranslate"><span class="pre">.airflowignore</span></code> file for a subfolder in <code class="docutils literal notranslate"><span class="pre">DAG_FOLDER</span></code> and it |
| would only be applicable for that subfolder.</p> |
| </div> |
| </div> |
| |
| |
| </div> |
| |
| </div> |
| |
| |
| <footer> |
| |
| <div class="rst-footer-buttons" role="navigation" aria-label="footer navigation"> |
| |
| <a href="profiling.html" class="btn btn-neutral float-right" title="Data Profiling" accesskey="n" rel="next">Next <span class="fa fa-arrow-circle-right"></span></a> |
| |
| |
| <a href="ui.html" class="btn btn-neutral float-left" title="UI / Screenshots" accesskey="p" rel="prev"><span class="fa fa-arrow-circle-left"></span> Previous</a> |
| |
| </div> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| <div class="footer">This page uses <a href="https://analytics.google.com/"> |
| Google Analytics</a> to collect statistics. You can disable it by blocking |
| the JavaScript coming from www.google-analytics.com. Check our |
| <a href="privacy_notice.html">Privacy Policy</a> |
| for more details. |
| <script type="text/javascript"> |
| (function() { |
| var ga = document.createElement('script'); |
| ga.src = ('https:' == document.location.protocol ? |
| 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js'; |
| ga.setAttribute('async', 'true'); |
| var nodes = document.documentElement.childNodes; |
| var i = -1; |
| var node; |
| do { |
| i++; |
| node = nodes[i] |
| } while(node.nodeType !== Node.ELEMENT_NODE); |
| node.appendChild(ga); |
| })(); |
| </script> |
| </div> |
| |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |