blob: 90b4f3a56b1e80ed467cc025dbc949a1dbba6fd3 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>airflow.models.dag &mdash; Airflow Documentation</title>
<script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/language_data.js"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../genindex.html" />
<link rel="search" title="Search" href="../../../search.html" />
<script>
document.addEventListener('DOMContentLoaded', function() {
var el = document.getElementById('changelog');
if (el !== null ) {
// [AIRFLOW-...]
el.innerHTML = el.innerHTML.replace(
/\[(AIRFLOW-[\d]+)\]/g,
`<a href="https://issues.apache.org/jira/browse/$1">[$1]</a>`
);
// (#...)
el.innerHTML = el.innerHTML.replace(
/\(#([\d]+)\)/g,
`<a href="https://github.com/apache/airflow/pull/$1">(#$1)</a>`
);
};
})
</script>
<style>
.example-header {
position: relative;
background: #9AAA7A;
padding: 8px 16px;
margin-bottom: 0;
}
.example-header--with-button {
padding-right: 166px;
}
.example-header:after{
content: '';
display: table;
clear: both;
}
.example-title {
display:block;
padding: 4px;
margin-right: 16px;
color: white;
overflow-x: auto;
}
.example-header-button {
top: 8px;
right: 16px;
position: absolute;
}
.example-header + .highlight-python {
margin-top: 0 !important;
}
.viewcode-button {
display: inline-block;
padding: 8px 16px;
border: 0;
margin: 0;
outline: 0;
border-radius: 2px;
-webkit-box-shadow: 0 3px 5px 0 rgba(0,0,0,.3);
box-shadow: 0 3px 6px 0 rgba(0,0,0,.3);
color: #404040;
background-color: #e7e7e7;
cursor: pointer;
font-size: 16px;
font-weight: 500;
line-height: 1;
text-decoration: none;
text-overflow: ellipsis;
overflow: hidden;
text-transform: uppercase;
-webkit-transition: background-color .2s;
transition: background-color .2s;
vertical-align: middle;
white-space: nowrap;
}
.viewcode-button:visited {
color: #404040;
}
.viewcode-button:hover, .viewcode-button:focus {
color: #404040;
background-color: #d6d6d6;
}
</style>
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../index.html" class="icon icon-home"> Airflow
</a>
<div class="version">
1.10.4
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../project.html">Project</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../license.html">License</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../start.html">Quick Start</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../installation.html">Installation</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../tutorial.html">Tutorial</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../howto/index.html">How-to Guides</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../ui.html">UI / Screenshots</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../concepts.html">Concepts</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../profiling.html">Data Profiling</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../cli.html">Command Line Interface</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../scheduler.html">Scheduling &amp; Triggers</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../plugins.html">Plugins</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../security.html">Security</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../timezone.html">Time zones</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../api.html">Experimental Rest API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../integration.html">Integration</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../metrics.html">Metrics</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../kubernetes.html">Kubernetes</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../lineage.html">Lineage</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../changelog.html">Changelog</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../faq.html">FAQ</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../macros.html">Macros reference</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../_api/index.html">API Reference</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Airflow</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li><a href="../models.html">airflow.models</a> &raquo;</li>
<li>airflow.models.dag</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for airflow.models.dag</h1><div class="highlight"><pre>
<span></span><span class="c1"># -*- coding: utf-8 -*-</span>
<span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span>
<span class="c1"># or more contributor license agreements. See the NOTICE file</span>
<span class="c1"># distributed with this work for additional information</span>
<span class="c1"># regarding copyright ownership. The ASF licenses this file</span>
<span class="c1"># to you under the Apache License, Version 2.0 (the</span>
<span class="c1"># &quot;License&quot;); you may not use this file except in compliance</span>
<span class="c1"># with the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing,</span>
<span class="c1"># software distributed under the License is distributed on an</span>
<span class="c1"># &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span>
<span class="c1"># KIND, either express or implied. See the License for the</span>
<span class="c1"># specific language governing permissions and limitations</span>
<span class="c1"># under the License.</span>
<span class="kn">from</span> <span class="nn">__future__</span> <span class="k">import</span> <span class="n">print_function</span>
<span class="kn">import</span> <span class="nn">copy</span>
<span class="kn">import</span> <span class="nn">functools</span>
<span class="kn">import</span> <span class="nn">os</span>
<span class="kn">import</span> <span class="nn">pickle</span>
<span class="kn">import</span> <span class="nn">re</span>
<span class="kn">import</span> <span class="nn">sys</span>
<span class="kn">import</span> <span class="nn">traceback</span>
<span class="kn">import</span> <span class="nn">warnings</span>
<span class="kn">from</span> <span class="nn">collections</span> <span class="k">import</span> <span class="n">OrderedDict</span><span class="p">,</span> <span class="n">defaultdict</span>
<span class="kn">from</span> <span class="nn">datetime</span> <span class="k">import</span> <span class="n">timedelta</span><span class="p">,</span> <span class="n">datetime</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="k">import</span> <span class="n">Union</span><span class="p">,</span> <span class="n">Optional</span><span class="p">,</span> <span class="n">Iterable</span><span class="p">,</span> <span class="n">Dict</span><span class="p">,</span> <span class="n">Type</span><span class="p">,</span> <span class="n">Callable</span><span class="p">,</span> <span class="n">List</span>
<span class="kn">import</span> <span class="nn">jinja2</span>
<span class="kn">import</span> <span class="nn">pendulum</span>
<span class="kn">import</span> <span class="nn">six</span>
<span class="kn">from</span> <span class="nn">croniter</span> <span class="k">import</span> <span class="n">croniter</span>
<span class="kn">from</span> <span class="nn">dateutil.relativedelta</span> <span class="k">import</span> <span class="n">relativedelta</span>
<span class="kn">from</span> <span class="nn">future.standard_library</span> <span class="k">import</span> <span class="n">install_aliases</span>
<span class="kn">from</span> <span class="nn">sqlalchemy</span> <span class="k">import</span> <span class="n">Column</span><span class="p">,</span> <span class="n">String</span><span class="p">,</span> <span class="n">Boolean</span><span class="p">,</span> <span class="n">Integer</span><span class="p">,</span> <span class="n">Text</span><span class="p">,</span> <span class="n">func</span><span class="p">,</span> <span class="n">or_</span>
<span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">configuration</span><span class="p">,</span> <span class="n">settings</span><span class="p">,</span> <span class="n">utils</span>
<span class="kn">from</span> <span class="nn">airflow.dag.base_dag</span> <span class="k">import</span> <span class="n">BaseDag</span>
<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowException</span><span class="p">,</span> <span class="n">AirflowDagCycleException</span>
<span class="kn">from</span> <span class="nn">airflow.executors</span> <span class="k">import</span> <span class="n">LocalExecutor</span><span class="p">,</span> <span class="n">get_default_executor</span>
<span class="kn">from</span> <span class="nn">airflow.models.base</span> <span class="k">import</span> <span class="n">Base</span><span class="p">,</span> <span class="n">ID_LEN</span>
<span class="kn">from</span> <span class="nn">airflow.models.dagbag</span> <span class="k">import</span> <span class="n">DagBag</span>
<span class="kn">from</span> <span class="nn">airflow.models.dagpickle</span> <span class="k">import</span> <span class="n">DagPickle</span>
<span class="kn">from</span> <span class="nn">airflow.models.dagrun</span> <span class="k">import</span> <span class="n">DagRun</span>
<span class="kn">from</span> <span class="nn">airflow.models.taskinstance</span> <span class="k">import</span> <span class="n">TaskInstance</span><span class="p">,</span> <span class="n">clear_task_instances</span>
<span class="kn">from</span> <span class="nn">airflow.utils</span> <span class="k">import</span> <span class="n">timezone</span>
<span class="kn">from</span> <span class="nn">airflow.utils.dates</span> <span class="k">import</span> <span class="n">cron_presets</span><span class="p">,</span> <span class="n">date_range</span> <span class="k">as</span> <span class="n">utils_date_range</span>
<span class="kn">from</span> <span class="nn">airflow.utils.db</span> <span class="k">import</span> <span class="n">provide_session</span>
<span class="kn">from</span> <span class="nn">airflow.utils.helpers</span> <span class="k">import</span> <span class="n">validate_key</span>
<span class="kn">from</span> <span class="nn">airflow.utils.log.logging_mixin</span> <span class="k">import</span> <span class="n">LoggingMixin</span>
<span class="kn">from</span> <span class="nn">airflow.utils.sqlalchemy</span> <span class="k">import</span> <span class="n">UtcDateTime</span><span class="p">,</span> <span class="n">Interval</span>
<span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="k">import</span> <span class="n">State</span>
<span class="n">install_aliases</span><span class="p">()</span>
<div class="viewcode-block" id="ScheduleInterval"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.ScheduleInterval">[docs]</a><span class="n">ScheduleInterval</span> <span class="o">=</span> <span class="n">Union</span><span class="p">[</span><span class="nb">str</span><span class="p">,</span> <span class="n">timedelta</span><span class="p">,</span> <span class="n">relativedelta</span><span class="p">]</span></div>
<div class="viewcode-block" id="get_last_dagrun"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.get_last_dagrun">[docs]</a><span class="k">def</span> <span class="nf">get_last_dagrun</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">session</span><span class="p">,</span> <span class="n">include_externally_triggered</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns the last dag run for a dag, None if there was none.</span>
<span class="sd"> Last dag run can be any type of run eg. scheduled or backfilled.</span>
<span class="sd"> Overridden DagRuns are ignored.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">DR</span> <span class="o">=</span> <span class="n">DagRun</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DR</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DR</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="n">dag_id</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">include_externally_triggered</span><span class="p">:</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DR</span><span class="o">.</span><span class="n">external_trigger</span> <span class="o">==</span> <span class="kc">False</span><span class="p">)</span> <span class="c1"># noqa</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">order_by</span><span class="p">(</span><span class="n">DR</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">desc</span><span class="p">())</span>
<span class="k">return</span> <span class="n">query</span><span class="o">.</span><span class="n">first</span><span class="p">()</span></div>
<div class="viewcode-block" id="DAG"><a class="viewcode-back" href="../../../_api/airflow/models/index.html#airflow.models.dag.DAG">[docs]</a><span class="nd">@functools</span><span class="o">.</span><span class="n">total_ordering</span>
<span class="k">class</span> <span class="nc">DAG</span><span class="p">(</span><span class="n">BaseDag</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> A dag (directed acyclic graph) is a collection of tasks with directional</span>
<span class="sd"> dependencies. A dag also has a schedule, a start date and an end date</span>
<span class="sd"> (optional). For each schedule, (say daily or hourly), the DAG needs to run</span>
<span class="sd"> each individual tasks as their dependencies are met. Certain tasks have</span>
<span class="sd"> the property of depending on their own past, meaning that they can&#39;t run</span>
<span class="sd"> until their previous schedule (and upstream tasks) are completed.</span>
<span class="sd"> DAGs essentially act as namespaces for tasks. A task_id can only be</span>
<span class="sd"> added once to a DAG.</span>
<span class="sd"> :param dag_id: The id of the DAG</span>
<span class="sd"> :type dag_id: str</span>
<span class="sd"> :param description: The description for the DAG to e.g. be shown on the webserver</span>
<span class="sd"> :type description: str</span>
<span class="sd"> :param schedule_interval: Defines how often that DAG runs, this</span>
<span class="sd"> timedelta object gets added to your latest task instance&#39;s</span>
<span class="sd"> execution_date to figure out the next schedule</span>
<span class="sd"> :type schedule_interval: datetime.timedelta or</span>
<span class="sd"> dateutil.relativedelta.relativedelta or str that acts as a cron</span>
<span class="sd"> expression</span>
<span class="sd"> :param start_date: The timestamp from which the scheduler will</span>
<span class="sd"> attempt to backfill</span>
<span class="sd"> :type start_date: datetime.datetime</span>
<span class="sd"> :param end_date: A date beyond which your DAG won&#39;t run, leave to None</span>
<span class="sd"> for open ended scheduling</span>
<span class="sd"> :type end_date: datetime.datetime</span>
<span class="sd"> :param template_searchpath: This list of folders (non relative)</span>
<span class="sd"> defines where jinja will look for your templates. Order matters.</span>
<span class="sd"> Note that jinja/airflow includes the path of your DAG file by</span>
<span class="sd"> default</span>
<span class="sd"> :type template_searchpath: str or list[str]</span>
<span class="sd"> :param template_undefined: Template undefined type.</span>
<span class="sd"> :type template_undefined: jinja2.Undefined</span>
<span class="sd"> :param user_defined_macros: a dictionary of macros that will be exposed</span>
<span class="sd"> in your jinja templates. For example, passing ``dict(foo=&#39;bar&#39;)``</span>
<span class="sd"> to this argument allows you to ``{{ foo }}`` in all jinja</span>
<span class="sd"> templates related to this DAG. Note that you can pass any</span>
<span class="sd"> type of object here.</span>
<span class="sd"> :type user_defined_macros: dict</span>
<span class="sd"> :param user_defined_filters: a dictionary of filters that will be exposed</span>
<span class="sd"> in your jinja templates. For example, passing</span>
<span class="sd"> ``dict(hello=lambda name: &#39;Hello %s&#39; % name)`` to this argument allows</span>
<span class="sd"> you to ``{{ &#39;world&#39; | hello }}`` in all jinja templates related to</span>
<span class="sd"> this DAG.</span>
<span class="sd"> :type user_defined_filters: dict</span>
<span class="sd"> :param default_args: A dictionary of default parameters to be used</span>
<span class="sd"> as constructor keyword parameters when initialising operators.</span>
<span class="sd"> Note that operators have the same hook, and precede those defined</span>
<span class="sd"> here, meaning that if your dict contains `&#39;depends_on_past&#39;: True`</span>
<span class="sd"> here and `&#39;depends_on_past&#39;: False` in the operator&#39;s call</span>
<span class="sd"> `default_args`, the actual value will be `False`.</span>
<span class="sd"> :type default_args: dict</span>
<span class="sd"> :param params: a dictionary of DAG level parameters that are made</span>
<span class="sd"> accessible in templates, namespaced under `params`. These</span>
<span class="sd"> params can be overridden at the task level.</span>
<span class="sd"> :type params: dict</span>
<span class="sd"> :param concurrency: the number of task instances allowed to run</span>
<span class="sd"> concurrently</span>
<span class="sd"> :type concurrency: int</span>
<span class="sd"> :param max_active_runs: maximum number of active DAG runs, beyond this</span>
<span class="sd"> number of DAG runs in a running state, the scheduler won&#39;t create</span>
<span class="sd"> new active DAG runs</span>
<span class="sd"> :type max_active_runs: int</span>
<span class="sd"> :param dagrun_timeout: specify how long a DagRun should be up before</span>
<span class="sd"> timing out / failing, so that new DagRuns can be created. The timeout</span>
<span class="sd"> is only enforced for scheduled DagRuns, and only once the</span>
<span class="sd"> # of active DagRuns == max_active_runs.</span>
<span class="sd"> :type dagrun_timeout: datetime.timedelta</span>
<span class="sd"> :param sla_miss_callback: specify a function to call when reporting SLA</span>
<span class="sd"> timeouts.</span>
<span class="sd"> :type sla_miss_callback: types.FunctionType</span>
<span class="sd"> :param default_view: Specify DAG default view (tree, graph, duration,</span>
<span class="sd"> gantt, landing_times)</span>
<span class="sd"> :type default_view: str</span>
<span class="sd"> :param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT)</span>
<span class="sd"> :type orientation: str</span>
<span class="sd"> :param catchup: Perform scheduler catchup (or only run latest)? Defaults to True</span>
<span class="sd"> :type catchup: bool</span>
<span class="sd"> :param on_failure_callback: A function to be called when a DagRun of this dag fails.</span>
<span class="sd"> A context dictionary is passed as a single parameter to this function.</span>
<span class="sd"> :type on_failure_callback: callable</span>
<span class="sd"> :param on_success_callback: Much like the ``on_failure_callback`` except</span>
<span class="sd"> that it is executed when the dag succeeds.</span>
<span class="sd"> :type on_success_callback: callable</span>
<span class="sd"> :param access_control: Specify optional DAG-level permissions, e.g.,</span>
<span class="sd"> &quot;{&#39;role1&#39;: {&#39;can_dag_read&#39;}, &#39;role2&#39;: {&#39;can_dag_read&#39;, &#39;can_dag_edit&#39;}}&quot;</span>
<span class="sd"> :type access_control: dict</span>
<span class="sd"> :param is_paused_upon_creation: Specifies if the dag is paused when created for the first time.</span>
<span class="sd"> If the dag exists already, this flag will be ignored. If this optional parameter</span>
<span class="sd"> is not specified, the global config setting will be used.</span>
<span class="sd"> :type is_paused_upon_creation: bool or None</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="DAG._comps"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG._comps">[docs]</a> <span class="n">_comps</span> <span class="o">=</span> <span class="p">{</span>
<span class="s1">&#39;dag_id&#39;</span><span class="p">,</span>
<span class="s1">&#39;task_ids&#39;</span><span class="p">,</span>
<span class="s1">&#39;parent_dag&#39;</span><span class="p">,</span>
<span class="s1">&#39;start_date&#39;</span><span class="p">,</span>
<span class="s1">&#39;schedule_interval&#39;</span><span class="p">,</span>
<span class="s1">&#39;full_filepath&#39;</span><span class="p">,</span>
<span class="s1">&#39;template_searchpath&#39;</span><span class="p">,</span>
<span class="s1">&#39;last_loaded&#39;</span><span class="p">,</span></div>
<span class="p">}</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">dag_id</span><span class="p">,</span> <span class="c1"># type: str</span>
<span class="n">description</span><span class="o">=</span><span class="s1">&#39;&#39;</span><span class="p">,</span> <span class="c1"># type: str</span>
<span class="n">schedule_interval</span><span class="o">=</span><span class="n">timedelta</span><span class="p">(</span><span class="n">days</span><span class="o">=</span><span class="mi">1</span><span class="p">),</span> <span class="c1"># type: Optional[ScheduleInterval]</span>
<span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[datetime]</span>
<span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[datetime]</span>
<span class="n">full_filepath</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[str]</span>
<span class="n">template_searchpath</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Union[str, Iterable[str]]]</span>
<span class="n">template_undefined</span><span class="o">=</span><span class="n">jinja2</span><span class="o">.</span><span class="n">Undefined</span><span class="p">,</span> <span class="c1"># type: Type[jinja2.Undefined]</span>
<span class="n">user_defined_macros</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span>
<span class="n">user_defined_filters</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span>
<span class="n">default_args</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span>
<span class="n">concurrency</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">getint</span><span class="p">(</span><span class="s1">&#39;core&#39;</span><span class="p">,</span> <span class="s1">&#39;dag_concurrency&#39;</span><span class="p">),</span> <span class="c1"># type: int</span>
<span class="n">max_active_runs</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">getint</span><span class="p">(</span>
<span class="s1">&#39;core&#39;</span><span class="p">,</span> <span class="s1">&#39;max_active_runs_per_dag&#39;</span><span class="p">),</span> <span class="c1"># type: int</span>
<span class="n">dagrun_timeout</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[timedelta]</span>
<span class="n">sla_miss_callback</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Callable]</span>
<span class="n">default_view</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[str]</span>
<span class="n">orientation</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;webserver&#39;</span><span class="p">,</span> <span class="s1">&#39;dag_orientation&#39;</span><span class="p">),</span> <span class="c1"># type: str</span>
<span class="n">catchup</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">&#39;scheduler&#39;</span><span class="p">,</span> <span class="s1">&#39;catchup_by_default&#39;</span><span class="p">),</span> <span class="c1"># type: bool</span>
<span class="n">on_success_callback</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Callable]</span>
<span class="n">on_failure_callback</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Callable]</span>
<span class="n">doc_md</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[str]</span>
<span class="n">params</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span>
<span class="n">access_control</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span>
<span class="n">is_paused_upon_creation</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[bool]</span>
<span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_defined_macros</span> <span class="o">=</span> <span class="n">user_defined_macros</span>
<span class="bp">self</span><span class="o">.</span><span class="n">user_defined_filters</span> <span class="o">=</span> <span class="n">user_defined_filters</span>
<span class="bp">self</span><span class="o">.</span><span class="n">default_args</span> <span class="o">=</span> <span class="n">copy</span><span class="o">.</span><span class="n">deepcopy</span><span class="p">(</span><span class="n">default_args</span> <span class="ow">or</span> <span class="p">{})</span>
<span class="bp">self</span><span class="o">.</span><span class="n">params</span> <span class="o">=</span> <span class="n">params</span> <span class="ow">or</span> <span class="p">{}</span>
<span class="c1"># merging potentially conflicting default_args[&#39;params&#39;] into params</span>
<span class="k">if</span> <span class="s1">&#39;params&#39;</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">params</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;params&#39;</span><span class="p">])</span>
<span class="k">del</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;params&#39;</span><span class="p">]</span>
<span class="n">validate_key</span><span class="p">(</span><span class="n">dag_id</span><span class="p">)</span>
<span class="c1"># Properties from BaseDag</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_dag_id</span> <span class="o">=</span> <span class="n">dag_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_full_filepath</span> <span class="o">=</span> <span class="n">full_filepath</span> <span class="k">if</span> <span class="n">full_filepath</span> <span class="k">else</span> <span class="s1">&#39;&#39;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_concurrency</span> <span class="o">=</span> <span class="n">concurrency</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pickle_id</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_description</span> <span class="o">=</span> <span class="n">description</span>
<span class="c1"># set file location to caller source path</span>
<span class="bp">self</span><span class="o">.</span><span class="n">fileloc</span> <span class="o">=</span> <span class="n">sys</span><span class="o">.</span><span class="n">_getframe</span><span class="p">()</span><span class="o">.</span><span class="n">f_back</span><span class="o">.</span><span class="n">f_code</span><span class="o">.</span><span class="n">co_filename</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">()</span> <span class="c1"># type: Dict[str, TaskInstance]</span>
<span class="c1"># set timezone from start_date</span>
<span class="k">if</span> <span class="n">start_date</span> <span class="ow">and</span> <span class="n">start_date</span><span class="o">.</span><span class="n">tzinfo</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">timezone</span> <span class="o">=</span> <span class="n">start_date</span><span class="o">.</span><span class="n">tzinfo</span>
<span class="k">elif</span> <span class="s1">&#39;start_date&#39;</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_args</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;start_date&#39;</span><span class="p">]:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;start_date&#39;</span><span class="p">],</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;start_date&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">timezone</span><span class="o">.</span><span class="n">parse</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;start_date&#39;</span><span class="p">])</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">timezone</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;start_date&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">tzinfo</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;timezone&#39;</span><span class="p">)</span> <span class="ow">or</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">timezone</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">timezone</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">TIMEZONE</span>
<span class="c1"># Apply the timezone we settled on to end_date if it wasn&#39;t supplied</span>
<span class="k">if</span> <span class="s1">&#39;end_date&#39;</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_args</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;end_date&#39;</span><span class="p">]:</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;end_date&#39;</span><span class="p">],</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;end_date&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">timezone</span><span class="o">.</span><span class="n">parse</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;end_date&#39;</span><span class="p">],</span> <span class="n">timezone</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">timezone</span><span class="p">)</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">convert_to_utc</span><span class="p">(</span><span class="n">start_date</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">convert_to_utc</span><span class="p">(</span><span class="n">end_date</span><span class="p">)</span>
<span class="c1"># also convert tasks</span>
<span class="k">if</span> <span class="s1">&#39;start_date&#39;</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;start_date&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">timezone</span><span class="o">.</span><span class="n">convert_to_utc</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;start_date&#39;</span><span class="p">])</span>
<span class="p">)</span>
<span class="k">if</span> <span class="s1">&#39;end_date&#39;</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;end_date&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">timezone</span><span class="o">.</span><span class="n">convert_to_utc</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">default_args</span><span class="p">[</span><span class="s1">&#39;end_date&#39;</span><span class="p">])</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">schedule_interval</span> <span class="o">=</span> <span class="n">schedule_interval</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">schedule_interval</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">)</span> <span class="ow">and</span> <span class="n">schedule_interval</span> <span class="ow">in</span> <span class="n">cron_presets</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span> <span class="o">=</span> <span class="n">cron_presets</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">schedule_interval</span><span class="p">)</span> <span class="c1"># type: Optional[ScheduleInterval]</span>
<span class="k">elif</span> <span class="n">schedule_interval</span> <span class="o">==</span> <span class="s1">&#39;@once&#39;</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span> <span class="o">=</span> <span class="n">schedule_interval</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">template_searchpath</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">):</span>
<span class="n">template_searchpath</span> <span class="o">=</span> <span class="p">[</span><span class="n">template_searchpath</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">template_searchpath</span> <span class="o">=</span> <span class="n">template_searchpath</span>
<span class="bp">self</span><span class="o">.</span><span class="n">template_undefined</span> <span class="o">=</span> <span class="n">template_undefined</span>
<span class="bp">self</span><span class="o">.</span><span class="n">parent_dag</span> <span class="o">=</span> <span class="kc">None</span> <span class="c1"># Gets set when DAGs are loaded</span>
<span class="bp">self</span><span class="o">.</span><span class="n">last_loaded</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">safe_dag_id</span> <span class="o">=</span> <span class="n">dag_id</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;.&#39;</span><span class="p">,</span> <span class="s1">&#39;__dot__&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_active_runs</span> <span class="o">=</span> <span class="n">max_active_runs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dagrun_timeout</span> <span class="o">=</span> <span class="n">dagrun_timeout</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sla_miss_callback</span> <span class="o">=</span> <span class="n">sla_miss_callback</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_default_view</span> <span class="o">=</span> <span class="n">default_view</span>
<span class="bp">self</span><span class="o">.</span><span class="n">orientation</span> <span class="o">=</span> <span class="n">orientation</span>
<span class="bp">self</span><span class="o">.</span><span class="n">catchup</span> <span class="o">=</span> <span class="n">catchup</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_subdag</span> <span class="o">=</span> <span class="kc">False</span> <span class="c1"># DagBag.bag_dag() will set this to True if appropriate</span>
<span class="bp">self</span><span class="o">.</span><span class="n">partial</span> <span class="o">=</span> <span class="kc">False</span>
<span class="bp">self</span><span class="o">.</span><span class="n">on_success_callback</span> <span class="o">=</span> <span class="n">on_success_callback</span>
<span class="bp">self</span><span class="o">.</span><span class="n">on_failure_callback</span> <span class="o">=</span> <span class="n">on_failure_callback</span>
<span class="bp">self</span><span class="o">.</span><span class="n">doc_md</span> <span class="o">=</span> <span class="n">doc_md</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_old_context_manager_dags</span> <span class="o">=</span> <span class="p">[]</span> <span class="c1"># type: Iterable[DAG]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_access_control</span> <span class="o">=</span> <span class="n">access_control</span>
<span class="bp">self</span><span class="o">.</span><span class="n">is_paused_upon_creation</span> <span class="o">=</span> <span class="n">is_paused_upon_creation</span>
<div class="viewcode-block" id="DAG.__repr__"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.__repr__">[docs]</a> <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s2">&quot;&lt;DAG: </span><span class="si">{self.dag_id}</span><span class="s2">&gt;&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">=</span><span class="bp">self</span><span class="p">)</span></div>
<div class="viewcode-block" id="DAG.__eq__"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.__eq__">[docs]</a> <span class="k">def</span> <span class="nf">__eq__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">==</span> <span class="nb">type</span><span class="p">(</span><span class="n">other</span><span class="p">)</span> <span class="ow">and</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">dag_id</span><span class="p">):</span>
<span class="c1"># Use getattr() instead of __dict__ as __dict__ doesn&#39;t return</span>
<span class="c1"># correct values for properties.</span>
<span class="k">return</span> <span class="nb">all</span><span class="p">(</span><span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">c</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> <span class="o">==</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">other</span><span class="p">,</span> <span class="n">c</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> <span class="k">for</span> <span class="n">c</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_comps</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">False</span></div>
<div class="viewcode-block" id="DAG.__ne__"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.__ne__">[docs]</a> <span class="k">def</span> <span class="nf">__ne__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="k">return</span> <span class="ow">not</span> <span class="bp">self</span> <span class="o">==</span> <span class="n">other</span></div>
<div class="viewcode-block" id="DAG.__lt__"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.__lt__">[docs]</a> <span class="k">def</span> <span class="nf">__lt__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">&lt;</span> <span class="n">other</span><span class="o">.</span><span class="n">dag_id</span></div>
<div class="viewcode-block" id="DAG.__hash__"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.__hash__">[docs]</a> <span class="k">def</span> <span class="nf">__hash__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">hash_components</span> <span class="o">=</span> <span class="p">[</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)]</span>
<span class="k">for</span> <span class="n">c</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_comps</span><span class="p">:</span>
<span class="c1"># task_ids returns a list and lists can&#39;t be hashed</span>
<span class="k">if</span> <span class="n">c</span> <span class="o">==</span> <span class="s1">&#39;task_ids&#39;</span><span class="p">:</span>
<span class="n">val</span> <span class="o">=</span> <span class="nb">tuple</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">val</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">c</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="nb">hash</span><span class="p">(</span><span class="n">val</span><span class="p">)</span>
<span class="n">hash_components</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">val</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="n">hash_components</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="nb">repr</span><span class="p">(</span><span class="n">val</span><span class="p">))</span>
<span class="k">return</span> <span class="nb">hash</span><span class="p">(</span><span class="nb">tuple</span><span class="p">(</span><span class="n">hash_components</span><span class="p">))</span></div>
<span class="c1"># Context Manager -----------------------------------------------</span>
<div class="viewcode-block" id="DAG.__enter__"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.__enter__">[docs]</a> <span class="k">def</span> <span class="nf">__enter__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_old_context_manager_dags</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">settings</span><span class="o">.</span><span class="n">CONTEXT_MANAGER_DAG</span><span class="p">)</span>
<span class="n">settings</span><span class="o">.</span><span class="n">CONTEXT_MANAGER_DAG</span> <span class="o">=</span> <span class="bp">self</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="DAG.__exit__"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.__exit__">[docs]</a> <span class="k">def</span> <span class="nf">__exit__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">_type</span><span class="p">,</span> <span class="n">_value</span><span class="p">,</span> <span class="n">_tb</span><span class="p">):</span>
<span class="n">settings</span><span class="o">.</span><span class="n">CONTEXT_MANAGER_DAG</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_old_context_manager_dags</span><span class="o">.</span><span class="n">pop</span><span class="p">()</span></div>
<span class="c1"># /Context Manager ----------------------------------------------</span>
<div class="viewcode-block" id="DAG.get_default_view"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.get_default_view">[docs]</a> <span class="k">def</span> <span class="nf">get_default_view</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;This is only there for backward compatible jinja2 templates&quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">_default_view</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;webserver&#39;</span><span class="p">,</span> <span class="s1">&#39;dag_default_view&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_default_view</span></div>
<div class="viewcode-block" id="DAG.date_range"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.date_range">[docs]</a> <span class="k">def</span> <span class="nf">date_range</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start_date</span><span class="p">,</span> <span class="n">num</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">end_date</span><span class="o">=</span><span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()):</span>
<span class="k">if</span> <span class="n">num</span><span class="p">:</span>
<span class="n">end_date</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">return</span> <span class="n">utils_date_range</span><span class="p">(</span>
<span class="n">start_date</span><span class="o">=</span><span class="n">start_date</span><span class="p">,</span> <span class="n">end_date</span><span class="o">=</span><span class="n">end_date</span><span class="p">,</span>
<span class="n">num</span><span class="o">=</span><span class="n">num</span><span class="p">,</span> <span class="n">delta</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span><span class="p">)</span></div>
<div class="viewcode-block" id="DAG.is_fixed_time_schedule"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.is_fixed_time_schedule">[docs]</a> <span class="k">def</span> <span class="nf">is_fixed_time_schedule</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Figures out if the DAG schedule has a fixed time (e.g. 3 AM).</span>
<span class="sd"> :return: True if the schedule has a fixed time, False if not.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">now</span> <span class="o">=</span> <span class="n">datetime</span><span class="o">.</span><span class="n">now</span><span class="p">()</span>
<span class="n">cron</span> <span class="o">=</span> <span class="n">croniter</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span><span class="p">,</span> <span class="n">now</span><span class="p">)</span>
<span class="n">start</span> <span class="o">=</span> <span class="n">cron</span><span class="o">.</span><span class="n">get_next</span><span class="p">(</span><span class="n">datetime</span><span class="p">)</span>
<span class="n">cron_next</span> <span class="o">=</span> <span class="n">cron</span><span class="o">.</span><span class="n">get_next</span><span class="p">(</span><span class="n">datetime</span><span class="p">)</span>
<span class="k">if</span> <span class="n">cron_next</span><span class="o">.</span><span class="n">minute</span> <span class="o">==</span> <span class="n">start</span><span class="o">.</span><span class="n">minute</span> <span class="ow">and</span> <span class="n">cron_next</span><span class="o">.</span><span class="n">hour</span> <span class="o">==</span> <span class="n">start</span><span class="o">.</span><span class="n">hour</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="k">return</span> <span class="kc">False</span></div>
<div class="viewcode-block" id="DAG.following_schedule"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.following_schedule">[docs]</a> <span class="k">def</span> <span class="nf">following_schedule</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dttm</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Calculates the following schedule for this dag in UTC.</span>
<span class="sd"> :param dttm: utc datetime</span>
<span class="sd"> :return: utc datetime</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">):</span>
<span class="c1"># we don&#39;t want to rely on the transitions created by</span>
<span class="c1"># croniter as they are not always correct</span>
<span class="n">dttm</span> <span class="o">=</span> <span class="n">pendulum</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="n">dttm</span><span class="p">)</span>
<span class="n">naive</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">make_naive</span><span class="p">(</span><span class="n">dttm</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">timezone</span><span class="p">)</span>
<span class="n">cron</span> <span class="o">=</span> <span class="n">croniter</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span><span class="p">,</span> <span class="n">naive</span><span class="p">)</span>
<span class="c1"># We assume that DST transitions happen on the minute/hour</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_fixed_time_schedule</span><span class="p">():</span>
<span class="c1"># relative offset (eg. every 5 minutes)</span>
<span class="n">delta</span> <span class="o">=</span> <span class="n">cron</span><span class="o">.</span><span class="n">get_next</span><span class="p">(</span><span class="n">datetime</span><span class="p">)</span> <span class="o">-</span> <span class="n">naive</span>
<span class="n">following</span> <span class="o">=</span> <span class="n">dttm</span><span class="o">.</span><span class="n">in_timezone</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">timezone</span><span class="p">)</span><span class="o">.</span><span class="n">add_timedelta</span><span class="p">(</span><span class="n">delta</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># absolute (e.g. 3 AM)</span>
<span class="n">naive</span> <span class="o">=</span> <span class="n">cron</span><span class="o">.</span><span class="n">get_next</span><span class="p">(</span><span class="n">datetime</span><span class="p">)</span>
<span class="n">tz</span> <span class="o">=</span> <span class="n">pendulum</span><span class="o">.</span><span class="n">timezone</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">timezone</span><span class="o">.</span><span class="n">name</span><span class="p">)</span>
<span class="n">following</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">make_aware</span><span class="p">(</span><span class="n">naive</span><span class="p">,</span> <span class="n">tz</span><span class="p">)</span>
<span class="k">return</span> <span class="n">timezone</span><span class="o">.</span><span class="n">convert_to_utc</span><span class="p">(</span><span class="n">following</span><span class="p">)</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">dttm</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span></div>
<div class="viewcode-block" id="DAG.previous_schedule"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.previous_schedule">[docs]</a> <span class="k">def</span> <span class="nf">previous_schedule</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dttm</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Calculates the previous schedule for this dag in UTC</span>
<span class="sd"> :param dttm: utc datetime</span>
<span class="sd"> :return: utc datetime</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">):</span>
<span class="c1"># we don&#39;t want to rely on the transitions created by</span>
<span class="c1"># croniter as they are not always correct</span>
<span class="n">dttm</span> <span class="o">=</span> <span class="n">pendulum</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="n">dttm</span><span class="p">)</span>
<span class="n">naive</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">make_naive</span><span class="p">(</span><span class="n">dttm</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">timezone</span><span class="p">)</span>
<span class="n">cron</span> <span class="o">=</span> <span class="n">croniter</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span><span class="p">,</span> <span class="n">naive</span><span class="p">)</span>
<span class="c1"># We assume that DST transitions happen on the minute/hour</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_fixed_time_schedule</span><span class="p">():</span>
<span class="c1"># relative offset (eg. every 5 minutes)</span>
<span class="n">delta</span> <span class="o">=</span> <span class="n">naive</span> <span class="o">-</span> <span class="n">cron</span><span class="o">.</span><span class="n">get_prev</span><span class="p">(</span><span class="n">datetime</span><span class="p">)</span>
<span class="n">previous</span> <span class="o">=</span> <span class="n">dttm</span><span class="o">.</span><span class="n">in_timezone</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">timezone</span><span class="p">)</span><span class="o">.</span><span class="n">subtract_timedelta</span><span class="p">(</span><span class="n">delta</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># absolute (e.g. 3 AM)</span>
<span class="n">naive</span> <span class="o">=</span> <span class="n">cron</span><span class="o">.</span><span class="n">get_prev</span><span class="p">(</span><span class="n">datetime</span><span class="p">)</span>
<span class="n">tz</span> <span class="o">=</span> <span class="n">pendulum</span><span class="o">.</span><span class="n">timezone</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">timezone</span><span class="o">.</span><span class="n">name</span><span class="p">)</span>
<span class="n">previous</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">make_aware</span><span class="p">(</span><span class="n">naive</span><span class="p">,</span> <span class="n">tz</span><span class="p">)</span>
<span class="k">return</span> <span class="n">timezone</span><span class="o">.</span><span class="n">convert_to_utc</span><span class="p">(</span><span class="n">previous</span><span class="p">)</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">dttm</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span></div>
<div class="viewcode-block" id="DAG.get_run_dates"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.get_run_dates">[docs]</a> <span class="k">def</span> <span class="nf">get_run_dates</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start_date</span><span class="p">,</span> <span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns a list of dates between the interval received as parameter using this</span>
<span class="sd"> dag&#39;s schedule interval. Returned dates can be used for execution dates.</span>
<span class="sd"> :param start_date: the start date of the interval</span>
<span class="sd"> :type start_date: datetime</span>
<span class="sd"> :param end_date: the end date of the interval, defaults to timezone.utcnow()</span>
<span class="sd"> :type end_date: datetime</span>
<span class="sd"> :return: a list of dates within the interval following the dag&#39;s schedule</span>
<span class="sd"> :rtype: list</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">run_dates</span> <span class="o">=</span> <span class="p">[]</span>
<span class="n">using_start_date</span> <span class="o">=</span> <span class="n">start_date</span>
<span class="n">using_end_date</span> <span class="o">=</span> <span class="n">end_date</span>
<span class="c1"># dates for dag runs</span>
<span class="n">using_start_date</span> <span class="o">=</span> <span class="n">using_start_date</span> <span class="ow">or</span> <span class="nb">min</span><span class="p">([</span><span class="n">t</span><span class="o">.</span><span class="n">start_date</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">tasks</span><span class="p">])</span>
<span class="n">using_end_date</span> <span class="o">=</span> <span class="n">using_end_date</span> <span class="ow">or</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="c1"># next run date for a subdag isn&#39;t relevant (schedule_interval for subdags</span>
<span class="c1"># is ignored) so we use the dag run&#39;s start date in the case of a subdag</span>
<span class="n">next_run_date</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">normalize_schedule</span><span class="p">(</span><span class="n">using_start_date</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_subdag</span> <span class="k">else</span> <span class="n">using_start_date</span><span class="p">)</span>
<span class="k">while</span> <span class="n">next_run_date</span> <span class="ow">and</span> <span class="n">next_run_date</span> <span class="o">&lt;=</span> <span class="n">using_end_date</span><span class="p">:</span>
<span class="n">run_dates</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">next_run_date</span><span class="p">)</span>
<span class="n">next_run_date</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">following_schedule</span><span class="p">(</span><span class="n">next_run_date</span><span class="p">)</span>
<span class="k">return</span> <span class="n">run_dates</span></div>
<div class="viewcode-block" id="DAG.normalize_schedule"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.normalize_schedule">[docs]</a> <span class="k">def</span> <span class="nf">normalize_schedule</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dttm</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns dttm + interval unless dttm is first interval then it returns dttm</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">following</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">following_schedule</span><span class="p">(</span><span class="n">dttm</span><span class="p">)</span>
<span class="c1"># in case of @once</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">following</span><span class="p">:</span>
<span class="k">return</span> <span class="n">dttm</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">previous_schedule</span><span class="p">(</span><span class="n">following</span><span class="p">)</span> <span class="o">!=</span> <span class="n">dttm</span><span class="p">:</span>
<span class="k">return</span> <span class="n">following</span>
<span class="k">return</span> <span class="n">dttm</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.get_last_dagrun"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.get_last_dagrun">[docs]</a> <span class="k">def</span> <span class="nf">get_last_dagrun</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">include_externally_triggered</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="k">return</span> <span class="n">get_last_dagrun</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">,</span>
<span class="n">include_externally_triggered</span><span class="o">=</span><span class="n">include_externally_triggered</span><span class="p">)</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.dag_id"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.dag_id">[docs]</a> <span class="k">def</span> <span class="nf">dag_id</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dag_id</span></div>
<span class="nd">@dag_id</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">dag_id</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_dag_id</span> <span class="o">=</span> <span class="n">value</span>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.full_filepath"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.full_filepath">[docs]</a> <span class="k">def</span> <span class="nf">full_filepath</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_full_filepath</span></div>
<span class="nd">@full_filepath</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">full_filepath</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_full_filepath</span> <span class="o">=</span> <span class="n">value</span>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.concurrency"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.concurrency">[docs]</a> <span class="k">def</span> <span class="nf">concurrency</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_concurrency</span></div>
<span class="nd">@concurrency</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">concurrency</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_concurrency</span> <span class="o">=</span> <span class="n">value</span>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.access_control"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.access_control">[docs]</a> <span class="k">def</span> <span class="nf">access_control</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_access_control</span></div>
<span class="nd">@access_control</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">access_control</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_access_control</span> <span class="o">=</span> <span class="n">value</span>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.description"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.description">[docs]</a> <span class="k">def</span> <span class="nf">description</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_description</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.pickle_id"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.pickle_id">[docs]</a> <span class="k">def</span> <span class="nf">pickle_id</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_pickle_id</span></div>
<span class="nd">@pickle_id</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">pickle_id</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_pickle_id</span> <span class="o">=</span> <span class="n">value</span>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.tasks"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.tasks">[docs]</a> <span class="k">def</span> <span class="nf">tasks</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span><span class="o">.</span><span class="n">values</span><span class="p">())</span></div>
<span class="nd">@tasks</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">tasks</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">val</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">AttributeError</span><span class="p">(</span>
<span class="s1">&#39;DAG.tasks can not be modified. Use dag.add_task() instead.&#39;</span><span class="p">)</span>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.task_ids"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.task_ids">[docs]</a> <span class="k">def</span> <span class="nf">task_ids</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.filepath"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.filepath">[docs]</a> <span class="k">def</span> <span class="nf">filepath</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> File location of where the dag object is instantiated</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">fn</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">full_filepath</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="n">settings</span><span class="o">.</span><span class="n">DAGS_FOLDER</span> <span class="o">+</span> <span class="s1">&#39;/&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="n">fn</span> <span class="o">=</span> <span class="n">fn</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">dirname</span><span class="p">(</span><span class="vm">__file__</span><span class="p">)</span> <span class="o">+</span> <span class="s1">&#39;/&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">fn</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.folder"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.folder">[docs]</a> <span class="k">def</span> <span class="nf">folder</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Folder location of where the dag object is instantiated</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">dirname</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">full_filepath</span><span class="p">)</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.owner"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.owner">[docs]</a> <span class="k">def</span> <span class="nf">owner</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return list of all owners found in DAG tasks.</span>
<span class="sd"> :return: Comma separated list of owners in DAG tasks</span>
<span class="sd"> :rtype: str</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="s2">&quot;, &quot;</span><span class="o">.</span><span class="n">join</span><span class="p">({</span><span class="n">t</span><span class="o">.</span><span class="n">owner</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">tasks</span><span class="p">})</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG._get_concurrency_reached"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG._get_concurrency_reached">[docs]</a> <span class="k">def</span> <span class="nf">_get_concurrency_reached</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span>
<span class="n">qry</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">func</span><span class="o">.</span><span class="n">count</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">task_id</span><span class="p">))</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">TI</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">TI</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">return</span> <span class="n">qry</span><span class="o">.</span><span class="n">scalar</span><span class="p">()</span> <span class="o">&gt;=</span> <span class="bp">self</span><span class="o">.</span><span class="n">concurrency</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.concurrency_reached"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.concurrency_reached">[docs]</a> <span class="k">def</span> <span class="nf">concurrency_reached</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns a boolean indicating whether the concurrency limit for this DAG</span>
<span class="sd"> has been reached</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_concurrency_reached</span><span class="p">()</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG._get_is_paused"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG._get_is_paused">[docs]</a> <span class="k">def</span> <span class="nf">_get_is_paused</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">qry</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">DagModel</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
<span class="k">return</span> <span class="n">qry</span><span class="o">.</span><span class="n">value</span><span class="p">(</span><span class="s1">&#39;is_paused&#39;</span><span class="p">)</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.is_paused"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.is_paused">[docs]</a> <span class="k">def</span> <span class="nf">is_paused</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns a boolean indicating whether this DAG is paused</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_is_paused</span><span class="p">()</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.handle_callback"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.handle_callback">[docs]</a> <span class="k">def</span> <span class="nf">handle_callback</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dagrun</span><span class="p">,</span> <span class="n">success</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="n">reason</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Triggers the appropriate callback depending on the value of success, namely the</span>
<span class="sd"> on_failure_callback or on_success_callback. This method gets the context of a</span>
<span class="sd"> single TaskInstance part of this DagRun and passes that to the callable along</span>
<span class="sd"> with a &#39;reason&#39;, primarily to differentiate DagRun failures.</span>
<span class="sd"> .. note: The logs end up in</span>
<span class="sd"> ``$AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log``</span>
<span class="sd"> :param dagrun: DagRun object</span>
<span class="sd"> :param success: Flag to specify if failure or success callback should be called</span>
<span class="sd"> :param reason: Completion reason</span>
<span class="sd"> :param session: Database session</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">callback</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">on_success_callback</span> <span class="k">if</span> <span class="n">success</span> <span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">on_failure_callback</span>
<span class="k">if</span> <span class="n">callback</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Executing dag callback function: </span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">callback</span><span class="p">))</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">dagrun</span><span class="o">.</span><span class="n">get_task_instances</span><span class="p">()</span>
<span class="n">ti</span> <span class="o">=</span> <span class="n">tis</span><span class="p">[</span><span class="o">-</span><span class="mi">1</span><span class="p">]</span> <span class="c1"># get first TaskInstance of DagRun</span>
<span class="n">ti</span><span class="o">.</span><span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">ti</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span>
<span class="n">context</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">get_template_context</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span>
<span class="n">context</span><span class="o">.</span><span class="n">update</span><span class="p">({</span><span class="s1">&#39;reason&#39;</span><span class="p">:</span> <span class="n">reason</span><span class="p">})</span>
<span class="n">callback</span><span class="p">(</span><span class="n">context</span><span class="p">)</span></div>
<div class="viewcode-block" id="DAG.get_active_runs"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.get_active_runs">[docs]</a> <span class="k">def</span> <span class="nf">get_active_runs</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns a list of dag run execution dates currently running</span>
<span class="sd"> :return: List of execution dates</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">runs</span> <span class="o">=</span> <span class="n">DagRun</span><span class="o">.</span><span class="n">find</span><span class="p">(</span><span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">state</span><span class="o">=</span><span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">)</span>
<span class="n">active_dates</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">run</span> <span class="ow">in</span> <span class="n">runs</span><span class="p">:</span>
<span class="n">active_dates</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">run</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>
<span class="k">return</span> <span class="n">active_dates</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.get_num_active_runs"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.get_num_active_runs">[docs]</a> <span class="k">def</span> <span class="nf">get_num_active_runs</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">external_trigger</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns the number of active &quot;running&quot; dag runs</span>
<span class="sd"> :param external_trigger: True for externally triggered active dag runs</span>
<span class="sd"> :type external_trigger: bool</span>
<span class="sd"> :param session:</span>
<span class="sd"> :return: number greater than 0 for active dag runs</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">query</span> <span class="o">=</span> <span class="p">(</span><span class="n">session</span>
<span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagRun</span><span class="p">)</span>
<span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagRun</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
<span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagRun</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">))</span>
<span class="k">if</span> <span class="n">external_trigger</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagRun</span><span class="o">.</span><span class="n">external_trigger</span> <span class="o">==</span> <span class="n">external_trigger</span><span class="p">)</span>
<span class="k">return</span> <span class="n">query</span><span class="o">.</span><span class="n">count</span><span class="p">()</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.get_dagrun"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.get_dagrun">[docs]</a> <span class="k">def</span> <span class="nf">get_dagrun</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns the dag run for a given execution date if it exists, otherwise</span>
<span class="sd"> none.</span>
<span class="sd"> :param execution_date: The execution date of the DagRun to find.</span>
<span class="sd"> :param session:</span>
<span class="sd"> :return: The DagRun if found, otherwise None.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">dagrun</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagRun</span><span class="p">)</span>
<span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">DagRun</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">DagRun</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="n">execution_date</span><span class="p">)</span>
<span class="o">.</span><span class="n">first</span><span class="p">())</span>
<span class="k">return</span> <span class="n">dagrun</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG._get_latest_execution_date"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG._get_latest_execution_date">[docs]</a> <span class="k">def</span> <span class="nf">_get_latest_execution_date</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">return</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">func</span><span class="o">.</span><span class="n">max</span><span class="p">(</span><span class="n">DagRun</span><span class="o">.</span><span class="n">execution_date</span><span class="p">))</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">DagRun</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span></div>
<span class="p">)</span><span class="o">.</span><span class="n">scalar</span><span class="p">()</span>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.latest_execution_date"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.latest_execution_date">[docs]</a> <span class="k">def</span> <span class="nf">latest_execution_date</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns the latest date for which at least one dag run exists</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_latest_execution_date</span><span class="p">()</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.subdags"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.subdags">[docs]</a> <span class="k">def</span> <span class="nf">subdags</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns a list of the subdag objects associated to this DAG</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># Check SubDag for class but don&#39;t check class directly</span>
<span class="kn">from</span> <span class="nn">airflow.operators.subdag_operator</span> <span class="k">import</span> <span class="n">SubDagOperator</span>
<span class="n">subdag_lst</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">tasks</span><span class="p">:</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">isinstance</span><span class="p">(</span><span class="n">task</span><span class="p">,</span> <span class="n">SubDagOperator</span><span class="p">)</span> <span class="ow">or</span>
<span class="c1"># TODO remove in Airflow 2.0</span>
<span class="nb">type</span><span class="p">(</span><span class="n">task</span><span class="p">)</span><span class="o">.</span><span class="vm">__name__</span> <span class="o">==</span> <span class="s1">&#39;SubDagOperator&#39;</span><span class="p">):</span>
<span class="n">subdag_lst</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">subdag</span><span class="p">)</span>
<span class="n">subdag_lst</span> <span class="o">+=</span> <span class="n">task</span><span class="o">.</span><span class="n">subdag</span><span class="o">.</span><span class="n">subdags</span>
<span class="k">return</span> <span class="n">subdag_lst</span></div>
<div class="viewcode-block" id="DAG.resolve_template_files"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.resolve_template_files">[docs]</a> <span class="k">def</span> <span class="nf">resolve_template_files</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">tasks</span><span class="p">:</span>
<span class="n">t</span><span class="o">.</span><span class="n">resolve_template_files</span><span class="p">()</span></div>
<div class="viewcode-block" id="DAG.get_template_env"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.get_template_env">[docs]</a> <span class="k">def</span> <span class="nf">get_template_env</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns a jinja2 Environment while taking into account the DAGs</span>
<span class="sd"> template_searchpath, user_defined_macros and user_defined_filters</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">searchpath</span> <span class="o">=</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">folder</span><span class="p">]</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">template_searchpath</span><span class="p">:</span>
<span class="n">searchpath</span> <span class="o">+=</span> <span class="bp">self</span><span class="o">.</span><span class="n">template_searchpath</span>
<span class="n">env</span> <span class="o">=</span> <span class="n">jinja2</span><span class="o">.</span><span class="n">Environment</span><span class="p">(</span>
<span class="n">loader</span><span class="o">=</span><span class="n">jinja2</span><span class="o">.</span><span class="n">FileSystemLoader</span><span class="p">(</span><span class="n">searchpath</span><span class="p">),</span>
<span class="n">undefined</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">template_undefined</span><span class="p">,</span>
<span class="n">extensions</span><span class="o">=</span><span class="p">[</span><span class="s2">&quot;jinja2.ext.do&quot;</span><span class="p">],</span>
<span class="n">cache_size</span><span class="o">=</span><span class="mi">0</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">user_defined_macros</span><span class="p">:</span>
<span class="n">env</span><span class="o">.</span><span class="n">globals</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">user_defined_macros</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">user_defined_filters</span><span class="p">:</span>
<span class="n">env</span><span class="o">.</span><span class="n">filters</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">user_defined_filters</span><span class="p">)</span>
<span class="k">return</span> <span class="n">env</span></div>
<div class="viewcode-block" id="DAG.set_dependency"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.set_dependency">[docs]</a> <span class="k">def</span> <span class="nf">set_dependency</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">upstream_task_id</span><span class="p">,</span> <span class="n">downstream_task_id</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Simple utility method to set dependency between two tasks that</span>
<span class="sd"> already have been added to the DAG using add_task()</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">upstream_task_id</span><span class="p">)</span><span class="o">.</span><span class="n">set_downstream</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">downstream_task_id</span><span class="p">))</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.get_task_instances"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.get_task_instances">[docs]</a> <span class="k">def</span> <span class="nf">get_task_instances</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">state</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">start_date</span><span class="p">:</span>
<span class="n">start_date</span> <span class="o">=</span> <span class="p">(</span><span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="o">-</span> <span class="n">timedelta</span><span class="p">(</span><span class="mi">30</span><span class="p">))</span><span class="o">.</span><span class="n">date</span><span class="p">()</span>
<span class="n">start_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">make_aware</span><span class="p">(</span>
<span class="n">datetime</span><span class="o">.</span><span class="n">combine</span><span class="p">(</span><span class="n">start_date</span><span class="p">,</span> <span class="n">datetime</span><span class="o">.</span><span class="n">min</span><span class="o">.</span><span class="n">time</span><span class="p">()))</span>
<span class="n">end_date</span> <span class="o">=</span> <span class="n">end_date</span> <span class="ow">or</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TaskInstance</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">&gt;=</span> <span class="n">start_date</span><span class="p">,</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">&lt;=</span> <span class="n">end_date</span><span class="p">,</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">task_id</span><span class="o">.</span><span class="n">in_</span><span class="p">([</span><span class="n">t</span><span class="o">.</span><span class="n">task_id</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">tasks</span><span class="p">]),</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">state</span><span class="p">:</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">tis</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">state</span><span class="p">)</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">tis</span><span class="o">.</span><span class="n">order_by</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">()</span>
<span class="k">return</span> <span class="n">tis</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="DAG.roots"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.roots">[docs]</a> <span class="k">def</span> <span class="nf">roots</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">[</span><span class="n">t</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">tasks</span> <span class="k">if</span> <span class="ow">not</span> <span class="n">t</span><span class="o">.</span><span class="n">downstream_list</span><span class="p">]</span></div>
<div class="viewcode-block" id="DAG.topological_sort"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.topological_sort">[docs]</a> <span class="k">def</span> <span class="nf">topological_sort</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Sorts tasks in topographical order, such that a task comes after any of its</span>
<span class="sd"> upstream dependencies.</span>
<span class="sd"> Heavily inspired by:</span>
<span class="sd"> http://blog.jupo.org/2012/04/06/topological-sorting-acyclic-directed-graphs/</span>
<span class="sd"> :return: list of tasks in topological order</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># convert into an OrderedDict to speedup lookup while keeping order the same</span>
<span class="n">graph_unsorted</span> <span class="o">=</span> <span class="n">OrderedDict</span><span class="p">((</span><span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">task</span><span class="p">)</span> <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">tasks</span><span class="p">)</span>
<span class="n">graph_sorted</span> <span class="o">=</span> <span class="p">[]</span>
<span class="c1"># special case</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">tasks</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">return</span> <span class="nb">tuple</span><span class="p">(</span><span class="n">graph_sorted</span><span class="p">)</span>
<span class="c1"># Run until the unsorted graph is empty.</span>
<span class="k">while</span> <span class="n">graph_unsorted</span><span class="p">:</span>
<span class="c1"># Go through each of the node/edges pairs in the unsorted</span>
<span class="c1"># graph. If a set of edges doesn&#39;t contain any nodes that</span>
<span class="c1"># haven&#39;t been resolved, that is, that are still in the</span>
<span class="c1"># unsorted graph, remove the pair from the unsorted graph,</span>
<span class="c1"># and append it to the sorted graph. Note here that by using</span>
<span class="c1"># using the items() method for iterating, a copy of the</span>
<span class="c1"># unsorted graph is used, allowing us to modify the unsorted</span>
<span class="c1"># graph as we move through it. We also keep a flag for</span>
<span class="c1"># checking that that graph is acyclic, which is true if any</span>
<span class="c1"># nodes are resolved during each pass through the graph. If</span>
<span class="c1"># not, we need to bail out as the graph therefore can&#39;t be</span>
<span class="c1"># sorted.</span>
<span class="n">acyclic</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">for</span> <span class="n">node</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">graph_unsorted</span><span class="o">.</span><span class="n">values</span><span class="p">()):</span>
<span class="k">for</span> <span class="n">edge</span> <span class="ow">in</span> <span class="n">node</span><span class="o">.</span><span class="n">upstream_list</span><span class="p">:</span>
<span class="k">if</span> <span class="n">edge</span><span class="o">.</span><span class="n">task_id</span> <span class="ow">in</span> <span class="n">graph_unsorted</span><span class="p">:</span>
<span class="k">break</span>
<span class="c1"># no edges in upstream tasks</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">acyclic</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">del</span> <span class="n">graph_unsorted</span><span class="p">[</span><span class="n">node</span><span class="o">.</span><span class="n">task_id</span><span class="p">]</span>
<span class="n">graph_sorted</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">node</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">acyclic</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">&quot;A cyclic dependency occurred in dag: </span><span class="si">{}</span><span class="s2">&quot;</span>
<span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">))</span>
<span class="k">return</span> <span class="nb">tuple</span><span class="p">(</span><span class="n">graph_sorted</span><span class="p">)</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.set_dag_runs_state"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.set_dag_runs_state">[docs]</a> <span class="k">def</span> <span class="nf">set_dag_runs_state</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">state</span><span class="o">=</span><span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="p">):</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagRun</span><span class="p">)</span><span class="o">.</span><span class="n">filter_by</span><span class="p">(</span><span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
<span class="k">if</span> <span class="n">start_date</span><span class="p">:</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagRun</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">&gt;=</span> <span class="n">start_date</span><span class="p">)</span>
<span class="k">if</span> <span class="n">end_date</span><span class="p">:</span>
<span class="n">query</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagRun</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">&lt;=</span> <span class="n">end_date</span><span class="p">)</span>
<span class="n">drs</span> <span class="o">=</span> <span class="n">query</span><span class="o">.</span><span class="n">all</span><span class="p">()</span>
<span class="n">dirty_ids</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">dr</span> <span class="ow">in</span> <span class="n">drs</span><span class="p">:</span>
<span class="n">dr</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">state</span>
<span class="n">dirty_ids</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">dr</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.clear"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.clear">[docs]</a> <span class="k">def</span> <span class="nf">clear</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">only_failed</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">only_running</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">confirm_prompt</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">include_subdags</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">include_parentdag</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">reset_dag_runs</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">dry_run</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">get_tis</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Clears a set of task instances associated with the current dag for</span>
<span class="sd"> a specified date range.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span>
<span class="k">if</span> <span class="n">include_subdags</span><span class="p">:</span>
<span class="c1"># Crafting the right filter for dag_id and task_ids combo</span>
<span class="n">conditions</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">subdags</span> <span class="o">+</span> <span class="p">[</span><span class="bp">self</span><span class="p">]:</span>
<span class="n">conditions</span><span class="o">.</span><span class="n">append</span><span class="p">(</span>
<span class="n">TI</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">like</span><span class="p">(</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span> <span class="o">&amp;</span>
<span class="n">TI</span><span class="o">.</span><span class="n">task_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">dag</span><span class="o">.</span><span class="n">task_ids</span><span class="p">)</span>
<span class="p">)</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">tis</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">or_</span><span class="p">(</span><span class="o">*</span><span class="n">conditions</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">tis</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">task_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_ids</span><span class="p">))</span>
<span class="k">if</span> <span class="n">include_parentdag</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_subdag</span><span class="p">:</span>
<span class="n">p_dag</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">parent_dag</span><span class="o">.</span><span class="n">sub_dag</span><span class="p">(</span>
<span class="n">task_regex</span><span class="o">=</span><span class="sa">r</span><span class="s2">&quot;^</span><span class="si">{}</span><span class="s2">$&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;.&#39;</span><span class="p">)[</span><span class="mi">1</span><span class="p">]),</span>
<span class="n">include_upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">include_downstream</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">tis</span><span class="o">.</span><span class="n">union</span><span class="p">(</span><span class="n">p_dag</span><span class="o">.</span><span class="n">clear</span><span class="p">(</span>
<span class="n">start_date</span><span class="o">=</span><span class="n">start_date</span><span class="p">,</span> <span class="n">end_date</span><span class="o">=</span><span class="n">end_date</span><span class="p">,</span>
<span class="n">only_failed</span><span class="o">=</span><span class="n">only_failed</span><span class="p">,</span>
<span class="n">only_running</span><span class="o">=</span><span class="n">only_running</span><span class="p">,</span>
<span class="n">confirm_prompt</span><span class="o">=</span><span class="n">confirm_prompt</span><span class="p">,</span>
<span class="n">include_subdags</span><span class="o">=</span><span class="n">include_subdags</span><span class="p">,</span>
<span class="n">include_parentdag</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">reset_dag_runs</span><span class="o">=</span><span class="n">reset_dag_runs</span><span class="p">,</span>
<span class="n">get_tis</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">,</span>
<span class="p">))</span>
<span class="k">if</span> <span class="n">start_date</span><span class="p">:</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">tis</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">&gt;=</span> <span class="n">start_date</span><span class="p">)</span>
<span class="k">if</span> <span class="n">end_date</span><span class="p">:</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">tis</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">&lt;=</span> <span class="n">end_date</span><span class="p">)</span>
<span class="k">if</span> <span class="n">only_failed</span><span class="p">:</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">tis</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">or_</span><span class="p">(</span>
<span class="n">TI</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">FAILED</span><span class="p">,</span>
<span class="n">TI</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UPSTREAM_FAILED</span><span class="p">))</span>
<span class="k">if</span> <span class="n">only_running</span><span class="p">:</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">tis</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">)</span>
<span class="k">if</span> <span class="n">get_tis</span><span class="p">:</span>
<span class="k">return</span> <span class="n">tis</span>
<span class="k">if</span> <span class="n">dry_run</span><span class="p">:</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">tis</span><span class="o">.</span><span class="n">all</span><span class="p">()</span>
<span class="n">session</span><span class="o">.</span><span class="n">expunge_all</span><span class="p">()</span>
<span class="k">return</span> <span class="n">tis</span>
<span class="n">count</span> <span class="o">=</span> <span class="n">tis</span><span class="o">.</span><span class="n">count</span><span class="p">()</span>
<span class="n">do_it</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">if</span> <span class="n">count</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">return</span> <span class="mi">0</span>
<span class="k">if</span> <span class="n">confirm_prompt</span><span class="p">:</span>
<span class="n">ti_list</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">join</span><span class="p">([</span><span class="nb">str</span><span class="p">(</span><span class="n">t</span><span class="p">)</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">])</span>
<span class="n">question</span> <span class="o">=</span> <span class="p">(</span>
<span class="s2">&quot;You are about to delete these </span><span class="si">{count}</span><span class="s2"> tasks:</span><span class="se">\n</span><span class="s2">&quot;</span>
<span class="s2">&quot;</span><span class="si">{ti_list}</span><span class="se">\n\n</span><span class="s2">&quot;</span>
<span class="s2">&quot;Are you sure? (yes/no): &quot;</span><span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">count</span><span class="o">=</span><span class="n">count</span><span class="p">,</span> <span class="n">ti_list</span><span class="o">=</span><span class="n">ti_list</span><span class="p">)</span>
<span class="n">do_it</span> <span class="o">=</span> <span class="n">utils</span><span class="o">.</span><span class="n">helpers</span><span class="o">.</span><span class="n">ask_yesno</span><span class="p">(</span><span class="n">question</span><span class="p">)</span>
<span class="k">if</span> <span class="n">do_it</span><span class="p">:</span>
<span class="n">clear_task_instances</span><span class="p">(</span><span class="n">tis</span><span class="o">.</span><span class="n">all</span><span class="p">(),</span>
<span class="n">session</span><span class="p">,</span>
<span class="n">dag</span><span class="o">=</span><span class="bp">self</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">reset_dag_runs</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">set_dag_runs_state</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">,</span>
<span class="n">start_date</span><span class="o">=</span><span class="n">start_date</span><span class="p">,</span>
<span class="n">end_date</span><span class="o">=</span><span class="n">end_date</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">count</span> <span class="o">=</span> <span class="mi">0</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">&quot;Bail. Nothing was cleared.&quot;</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="k">return</span> <span class="n">count</span></div>
<span class="nd">@classmethod</span>
<div class="viewcode-block" id="DAG.clear_dags"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.clear_dags">[docs]</a> <span class="k">def</span> <span class="nf">clear_dags</span><span class="p">(</span>
<span class="bp">cls</span><span class="p">,</span> <span class="n">dags</span><span class="p">,</span>
<span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">only_failed</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">only_running</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">confirm_prompt</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">include_subdags</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">include_parentdag</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">reset_dag_runs</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">dry_run</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="p">):</span>
<span class="n">all_tis</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="n">dags</span><span class="p">:</span>
<span class="n">tis</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">clear</span><span class="p">(</span>
<span class="n">start_date</span><span class="o">=</span><span class="n">start_date</span><span class="p">,</span>
<span class="n">end_date</span><span class="o">=</span><span class="n">end_date</span><span class="p">,</span>
<span class="n">only_failed</span><span class="o">=</span><span class="n">only_failed</span><span class="p">,</span>
<span class="n">only_running</span><span class="o">=</span><span class="n">only_running</span><span class="p">,</span>
<span class="n">confirm_prompt</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">include_subdags</span><span class="o">=</span><span class="n">include_subdags</span><span class="p">,</span>
<span class="n">include_parentdag</span><span class="o">=</span><span class="n">include_parentdag</span><span class="p">,</span>
<span class="n">reset_dag_runs</span><span class="o">=</span><span class="n">reset_dag_runs</span><span class="p">,</span>
<span class="n">dry_run</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="n">all_tis</span><span class="o">.</span><span class="n">extend</span><span class="p">(</span><span class="n">tis</span><span class="p">)</span>
<span class="k">if</span> <span class="n">dry_run</span><span class="p">:</span>
<span class="k">return</span> <span class="n">all_tis</span>
<span class="n">count</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">all_tis</span><span class="p">)</span>
<span class="n">do_it</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">if</span> <span class="n">count</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">&quot;Nothing to clear.&quot;</span><span class="p">)</span>
<span class="k">return</span> <span class="mi">0</span>
<span class="k">if</span> <span class="n">confirm_prompt</span><span class="p">:</span>
<span class="n">ti_list</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">join</span><span class="p">([</span><span class="nb">str</span><span class="p">(</span><span class="n">t</span><span class="p">)</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">all_tis</span><span class="p">])</span>
<span class="n">question</span> <span class="o">=</span> <span class="p">(</span>
<span class="s2">&quot;You are about to delete these </span><span class="si">{}</span><span class="s2"> tasks:</span><span class="se">\n</span><span class="s2">&quot;</span>
<span class="s2">&quot;</span><span class="si">{}</span><span class="se">\n\n</span><span class="s2">&quot;</span>
<span class="s2">&quot;Are you sure? (yes/no): &quot;</span><span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">count</span><span class="p">,</span> <span class="n">ti_list</span><span class="p">)</span>
<span class="n">do_it</span> <span class="o">=</span> <span class="n">utils</span><span class="o">.</span><span class="n">helpers</span><span class="o">.</span><span class="n">ask_yesno</span><span class="p">(</span><span class="n">question</span><span class="p">)</span>
<span class="k">if</span> <span class="n">do_it</span><span class="p">:</span>
<span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="n">dags</span><span class="p">:</span>
<span class="n">dag</span><span class="o">.</span><span class="n">clear</span><span class="p">(</span><span class="n">start_date</span><span class="o">=</span><span class="n">start_date</span><span class="p">,</span>
<span class="n">end_date</span><span class="o">=</span><span class="n">end_date</span><span class="p">,</span>
<span class="n">only_failed</span><span class="o">=</span><span class="n">only_failed</span><span class="p">,</span>
<span class="n">only_running</span><span class="o">=</span><span class="n">only_running</span><span class="p">,</span>
<span class="n">confirm_prompt</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">include_subdags</span><span class="o">=</span><span class="n">include_subdags</span><span class="p">,</span>
<span class="n">reset_dag_runs</span><span class="o">=</span><span class="n">reset_dag_runs</span><span class="p">,</span>
<span class="n">dry_run</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">count</span> <span class="o">=</span> <span class="mi">0</span>
<span class="nb">print</span><span class="p">(</span><span class="s2">&quot;Bail. Nothing was cleared.&quot;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">count</span></div>
<div class="viewcode-block" id="DAG.__deepcopy__"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.__deepcopy__">[docs]</a> <span class="k">def</span> <span class="nf">__deepcopy__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">memo</span><span class="p">):</span>
<span class="c1"># Swiwtcharoo to go around deepcopying objects coming through the</span>
<span class="c1"># backdoor</span>
<span class="bp">cls</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span>
<span class="n">result</span> <span class="o">=</span> <span class="bp">cls</span><span class="o">.</span><span class="fm">__new__</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span>
<span class="n">memo</span><span class="p">[</span><span class="nb">id</span><span class="p">(</span><span class="bp">self</span><span class="p">)]</span> <span class="o">=</span> <span class="n">result</span>
<span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">items</span><span class="p">()):</span>
<span class="k">if</span> <span class="n">k</span> <span class="ow">not</span> <span class="ow">in</span> <span class="p">(</span><span class="s1">&#39;user_defined_macros&#39;</span><span class="p">,</span> <span class="s1">&#39;user_defined_filters&#39;</span><span class="p">,</span> <span class="s1">&#39;params&#39;</span><span class="p">):</span>
<span class="nb">setattr</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="n">k</span><span class="p">,</span> <span class="n">copy</span><span class="o">.</span><span class="n">deepcopy</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="n">memo</span><span class="p">))</span>
<span class="n">result</span><span class="o">.</span><span class="n">user_defined_macros</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">user_defined_macros</span>
<span class="n">result</span><span class="o">.</span><span class="n">user_defined_filters</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">user_defined_filters</span>
<span class="n">result</span><span class="o">.</span><span class="n">params</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">params</span>
<span class="k">return</span> <span class="n">result</span></div>
<div class="viewcode-block" id="DAG.sub_dag"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.sub_dag">[docs]</a> <span class="k">def</span> <span class="nf">sub_dag</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task_regex</span><span class="p">,</span> <span class="n">include_downstream</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">include_upstream</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns a subset of the current dag as a deep copy of the current dag</span>
<span class="sd"> based on a regex that should match one or many tasks, and includes</span>
<span class="sd"> upstream and downstream neighbours based on the flag passed.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># deep-copying self.task_dict takes a long time, and we don&#39;t want all</span>
<span class="c1"># the tasks anyway, so we copy the tasks manually later</span>
<span class="n">task_dict</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">dag</span> <span class="o">=</span> <span class="n">copy</span><span class="o">.</span><span class="n">deepcopy</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span> <span class="o">=</span> <span class="n">task_dict</span>
<span class="n">regex_match</span> <span class="o">=</span> <span class="p">[</span>
<span class="n">t</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">tasks</span> <span class="k">if</span> <span class="n">re</span><span class="o">.</span><span class="n">findall</span><span class="p">(</span><span class="n">task_regex</span><span class="p">,</span> <span class="n">t</span><span class="o">.</span><span class="n">task_id</span><span class="p">)]</span>
<span class="n">also_include</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">regex_match</span><span class="p">:</span>
<span class="k">if</span> <span class="n">include_downstream</span><span class="p">:</span>
<span class="n">also_include</span> <span class="o">+=</span> <span class="n">t</span><span class="o">.</span><span class="n">get_flat_relatives</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span>
<span class="k">if</span> <span class="n">include_upstream</span><span class="p">:</span>
<span class="n">also_include</span> <span class="o">+=</span> <span class="n">t</span><span class="o">.</span><span class="n">get_flat_relatives</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="c1"># Compiling the unique list of tasks that made the cut</span>
<span class="c1"># Make sure to not recursively deepcopy the dag while copying the task</span>
<span class="n">dag</span><span class="o">.</span><span class="n">task_dict</span> <span class="o">=</span> <span class="p">{</span><span class="n">t</span><span class="o">.</span><span class="n">task_id</span><span class="p">:</span> <span class="n">copy</span><span class="o">.</span><span class="n">deepcopy</span><span class="p">(</span><span class="n">t</span><span class="p">,</span> <span class="p">{</span><span class="nb">id</span><span class="p">(</span><span class="n">t</span><span class="o">.</span><span class="n">dag</span><span class="p">):</span> <span class="n">t</span><span class="o">.</span><span class="n">dag</span><span class="p">})</span>
<span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">regex_match</span> <span class="o">+</span> <span class="n">also_include</span><span class="p">}</span>
<span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">tasks</span><span class="p">:</span>
<span class="c1"># Removing upstream/downstream references to tasks that did not</span>
<span class="c1"># made the cut</span>
<span class="n">t</span><span class="o">.</span><span class="n">_upstream_task_ids</span> <span class="o">=</span> <span class="n">t</span><span class="o">.</span><span class="n">_upstream_task_ids</span><span class="o">.</span><span class="n">intersection</span><span class="p">(</span><span class="n">dag</span><span class="o">.</span><span class="n">task_dict</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span>
<span class="n">t</span><span class="o">.</span><span class="n">_downstream_task_ids</span> <span class="o">=</span> <span class="n">t</span><span class="o">.</span><span class="n">_downstream_task_ids</span><span class="o">.</span><span class="n">intersection</span><span class="p">(</span>
<span class="n">dag</span><span class="o">.</span><span class="n">task_dict</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">dag</span><span class="o">.</span><span class="n">tasks</span><span class="p">)</span> <span class="o">&lt;</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">tasks</span><span class="p">):</span>
<span class="n">dag</span><span class="o">.</span><span class="n">partial</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">return</span> <span class="n">dag</span></div>
<div class="viewcode-block" id="DAG.has_task"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.has_task">[docs]</a> <span class="k">def</span> <span class="nf">has_task</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task_id</span><span class="p">):</span>
<span class="k">return</span> <span class="n">task_id</span> <span class="ow">in</span> <span class="p">(</span><span class="n">t</span><span class="o">.</span><span class="n">task_id</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">tasks</span><span class="p">)</span></div>
<div class="viewcode-block" id="DAG.get_task"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.get_task">[docs]</a> <span class="k">def</span> <span class="nf">get_task</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task_id</span><span class="p">):</span>
<span class="k">if</span> <span class="n">task_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span><span class="p">[</span><span class="n">task_id</span><span class="p">]</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">&quot;Task </span><span class="si">{task_id}</span><span class="s2"> not found&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="n">task_id</span><span class="p">))</span></div>
<div class="viewcode-block" id="DAG.pickle_info"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.pickle_info">[docs]</a> <span class="k">def</span> <span class="nf">pickle_info</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">d</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">()</span>
<span class="n">d</span><span class="p">[</span><span class="s1">&#39;is_picklable&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">dttm</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="n">pickled</span> <span class="o">=</span> <span class="n">pickle</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="n">d</span><span class="p">[</span><span class="s1">&#39;pickle_len&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="n">pickled</span><span class="p">)</span>
<span class="n">d</span><span class="p">[</span><span class="s1">&#39;pickling_duration&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> <span class="o">-</span> <span class="n">dttm</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
<span class="n">d</span><span class="p">[</span><span class="s1">&#39;is_picklable&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="kc">False</span>
<span class="n">d</span><span class="p">[</span><span class="s1">&#39;stacktrace&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">traceback</span><span class="o">.</span><span class="n">format_exc</span><span class="p">()</span>
<span class="k">return</span> <span class="n">d</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.pickle"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.pickle">[docs]</a> <span class="k">def</span> <span class="nf">pickle</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">dag</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
<span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagModel</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span>
<span class="n">dp</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">dag</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">pickle_id</span><span class="p">:</span>
<span class="n">dp</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagPickle</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">DagPickle</span><span class="o">.</span><span class="n">id</span> <span class="o">==</span> <span class="n">dag</span><span class="o">.</span><span class="n">pickle_id</span><span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">dp</span> <span class="ow">or</span> <span class="n">dp</span><span class="o">.</span><span class="n">pickle</span> <span class="o">!=</span> <span class="bp">self</span><span class="p">:</span>
<span class="n">dp</span> <span class="o">=</span> <span class="n">DagPickle</span><span class="p">(</span><span class="n">dag</span><span class="o">=</span><span class="bp">self</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">dp</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">last_pickled</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pickle_id</span> <span class="o">=</span> <span class="n">dp</span><span class="o">.</span><span class="n">id</span>
<span class="k">return</span> <span class="n">dp</span></div>
<div class="viewcode-block" id="DAG.tree_view"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.tree_view">[docs]</a> <span class="k">def</span> <span class="nf">tree_view</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Shows an ascii tree representation of the DAG</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">get_downstream</span><span class="p">(</span><span class="n">task</span><span class="p">,</span> <span class="n">level</span><span class="o">=</span><span class="mi">0</span><span class="p">):</span>
<span class="nb">print</span><span class="p">((</span><span class="s2">&quot; &quot;</span> <span class="o">*</span> <span class="n">level</span> <span class="o">*</span> <span class="mi">4</span><span class="p">)</span> <span class="o">+</span> <span class="nb">str</span><span class="p">(</span><span class="n">task</span><span class="p">))</span>
<span class="n">level</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">task</span><span class="o">.</span><span class="n">upstream_list</span><span class="p">:</span>
<span class="n">get_downstream</span><span class="p">(</span><span class="n">t</span><span class="p">,</span> <span class="n">level</span><span class="p">)</span>
<span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">roots</span><span class="p">:</span>
<span class="n">get_downstream</span><span class="p">(</span><span class="n">t</span><span class="p">)</span></div>
<div class="viewcode-block" id="DAG.add_task"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.add_task">[docs]</a> <span class="k">def</span> <span class="nf">add_task</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Add a task to the DAG</span>
<span class="sd"> :param task: the task you want to add</span>
<span class="sd"> :type task: task</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">task</span><span class="o">.</span><span class="n">start_date</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">&quot;Task is missing the start_date parameter&quot;</span><span class="p">)</span>
<span class="c1"># if the task has no start date, assign it the same as the DAG</span>
<span class="k">elif</span> <span class="ow">not</span> <span class="n">task</span><span class="o">.</span><span class="n">start_date</span><span class="p">:</span>
<span class="n">task</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span>
<span class="c1"># otherwise, the task will start on the later of its own start date and</span>
<span class="c1"># the DAG&#39;s start date</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span><span class="p">:</span>
<span class="n">task</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="nb">max</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">start_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span><span class="p">)</span>
<span class="c1"># if the task has no end date, assign it the same as the dag</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">task</span><span class="o">.</span><span class="n">end_date</span><span class="p">:</span>
<span class="n">task</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span>
<span class="c1"># otherwise, the task will end on the earlier of its own end date and</span>
<span class="c1"># the DAG&#39;s end date</span>
<span class="k">elif</span> <span class="n">task</span><span class="o">.</span><span class="n">end_date</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span><span class="p">:</span>
<span class="n">task</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="nb">min</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">end_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span><span class="p">)</span>
<span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span><span class="p">:</span>
<span class="c1"># TODO: raise an error in Airflow 2.0</span>
<span class="n">warnings</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span>
<span class="s1">&#39;The requested task could not be added to the DAG because a &#39;</span>
<span class="s1">&#39;task with task_id </span><span class="si">{}</span><span class="s1"> is already in the DAG. Starting in &#39;</span>
<span class="s1">&#39;Airflow 2.0, trying to overwrite a task will raise an &#39;</span>
<span class="s1">&#39;exception.&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">),</span>
<span class="n">category</span><span class="o">=</span><span class="ne">PendingDeprecationWarning</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span><span class="p">[</span><span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">]</span> <span class="o">=</span> <span class="n">task</span>
<span class="n">task</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="bp">self</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_count</span> <span class="o">=</span> <span class="nb">len</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span><span class="p">)</span></div>
<div class="viewcode-block" id="DAG.add_tasks"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.add_tasks">[docs]</a> <span class="k">def</span> <span class="nf">add_tasks</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">tasks</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Add a list of tasks to the DAG</span>
<span class="sd"> :param tasks: a lit of tasks you want to add</span>
<span class="sd"> :type tasks: list of tasks</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">tasks</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">add_task</span><span class="p">(</span><span class="n">task</span><span class="p">)</span></div>
<div class="viewcode-block" id="DAG.run"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.run">[docs]</a> <span class="k">def</span> <span class="nf">run</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">local</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">executor</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">donot_pickle</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">&#39;core&#39;</span><span class="p">,</span> <span class="s1">&#39;donot_pickle&#39;</span><span class="p">),</span>
<span class="n">ignore_task_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_first_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">delay_on_limit_secs</span><span class="o">=</span><span class="mf">1.0</span><span class="p">,</span>
<span class="n">verbose</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">conf</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">rerun_failed_tasks</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">run_backwards</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Runs the DAG.</span>
<span class="sd"> :param start_date: the start date of the range to run</span>
<span class="sd"> :type start_date: datetime.datetime</span>
<span class="sd"> :param end_date: the end date of the range to run</span>
<span class="sd"> :type end_date: datetime.datetime</span>
<span class="sd"> :param mark_success: True to mark jobs as succeeded without running them</span>
<span class="sd"> :type mark_success: bool</span>
<span class="sd"> :param local: True to run the tasks using the LocalExecutor</span>
<span class="sd"> :type local: bool</span>
<span class="sd"> :param executor: The executor instance to run the tasks</span>
<span class="sd"> :type executor: airflow.executor.BaseExecutor</span>
<span class="sd"> :param donot_pickle: True to avoid pickling DAG object and send to workers</span>
<span class="sd"> :type donot_pickle: bool</span>
<span class="sd"> :param ignore_task_deps: True to skip upstream tasks</span>
<span class="sd"> :type ignore_task_deps: bool</span>
<span class="sd"> :param ignore_first_depends_on_past: True to ignore depends_on_past</span>
<span class="sd"> dependencies for the first set of tasks only</span>
<span class="sd"> :type ignore_first_depends_on_past: bool</span>
<span class="sd"> :param pool: Resource pool to use</span>
<span class="sd"> :type pool: str</span>
<span class="sd"> :param delay_on_limit_secs: Time in seconds to wait before next attempt to run</span>
<span class="sd"> dag run when max_active_runs limit has been reached</span>
<span class="sd"> :type delay_on_limit_secs: float</span>
<span class="sd"> :param verbose: Make logging output more verbose</span>
<span class="sd"> :type verbose: bool</span>
<span class="sd"> :param conf: user defined dictionary passed from CLI</span>
<span class="sd"> :type conf: dict</span>
<span class="sd"> :param rerun_failed_tasks:</span>
<span class="sd"> :type: bool</span>
<span class="sd"> :param run_backwards:</span>
<span class="sd"> :type: bool</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">airflow.jobs</span> <span class="k">import</span> <span class="n">BackfillJob</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">executor</span> <span class="ow">and</span> <span class="n">local</span><span class="p">:</span>
<span class="n">executor</span> <span class="o">=</span> <span class="n">LocalExecutor</span><span class="p">()</span>
<span class="k">elif</span> <span class="ow">not</span> <span class="n">executor</span><span class="p">:</span>
<span class="n">executor</span> <span class="o">=</span> <span class="n">get_default_executor</span><span class="p">()</span>
<span class="n">job</span> <span class="o">=</span> <span class="n">BackfillJob</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">start_date</span><span class="o">=</span><span class="n">start_date</span><span class="p">,</span>
<span class="n">end_date</span><span class="o">=</span><span class="n">end_date</span><span class="p">,</span>
<span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span>
<span class="n">executor</span><span class="o">=</span><span class="n">executor</span><span class="p">,</span>
<span class="n">donot_pickle</span><span class="o">=</span><span class="n">donot_pickle</span><span class="p">,</span>
<span class="n">ignore_task_deps</span><span class="o">=</span><span class="n">ignore_task_deps</span><span class="p">,</span>
<span class="n">ignore_first_depends_on_past</span><span class="o">=</span><span class="n">ignore_first_depends_on_past</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span>
<span class="n">delay_on_limit_secs</span><span class="o">=</span><span class="n">delay_on_limit_secs</span><span class="p">,</span>
<span class="n">verbose</span><span class="o">=</span><span class="n">verbose</span><span class="p">,</span>
<span class="n">conf</span><span class="o">=</span><span class="n">conf</span><span class="p">,</span>
<span class="n">rerun_failed_tasks</span><span class="o">=</span><span class="n">rerun_failed_tasks</span><span class="p">,</span>
<span class="n">run_backwards</span><span class="o">=</span><span class="n">run_backwards</span><span class="p">,</span>
<span class="p">)</span>
<span class="n">job</span><span class="o">.</span><span class="n">run</span><span class="p">()</span></div>
<div class="viewcode-block" id="DAG.cli"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.cli">[docs]</a> <span class="k">def</span> <span class="nf">cli</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Exposes a CLI specific to this DAG</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">airflow.bin</span> <span class="k">import</span> <span class="n">cli</span>
<span class="n">parser</span> <span class="o">=</span> <span class="n">cli</span><span class="o">.</span><span class="n">CLIFactory</span><span class="o">.</span><span class="n">get_parser</span><span class="p">(</span><span class="n">dag_parser</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="n">args</span> <span class="o">=</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse_args</span><span class="p">()</span>
<span class="n">args</span><span class="o">.</span><span class="n">func</span><span class="p">(</span><span class="n">args</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.create_dagrun"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.create_dagrun">[docs]</a> <span class="k">def</span> <span class="nf">create_dagrun</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">run_id</span><span class="p">,</span>
<span class="n">state</span><span class="p">,</span>
<span class="n">execution_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">external_trigger</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">conf</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Creates a dag run from this dag including the tasks associated with this dag.</span>
<span class="sd"> Returns the dag run.</span>
<span class="sd"> :param run_id: defines the the run id for this dag run</span>
<span class="sd"> :type run_id: str</span>
<span class="sd"> :param execution_date: the execution date of this dag run</span>
<span class="sd"> :type execution_date: datetime.datetime</span>
<span class="sd"> :param state: the state of the dag run</span>
<span class="sd"> :type state: airflow.utils.state.State</span>
<span class="sd"> :param start_date: the date this dag run should be evaluated</span>
<span class="sd"> :type start_date: datetime</span>
<span class="sd"> :param external_trigger: whether this dag run is externally triggered</span>
<span class="sd"> :type external_trigger: bool</span>
<span class="sd"> :param session: database session</span>
<span class="sd"> :type session: sqlalchemy.orm.session.Session</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">run</span> <span class="o">=</span> <span class="n">DagRun</span><span class="p">(</span>
<span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">run_id</span><span class="o">=</span><span class="n">run_id</span><span class="p">,</span>
<span class="n">execution_date</span><span class="o">=</span><span class="n">execution_date</span><span class="p">,</span>
<span class="n">start_date</span><span class="o">=</span><span class="n">start_date</span><span class="p">,</span>
<span class="n">external_trigger</span><span class="o">=</span><span class="n">external_trigger</span><span class="p">,</span>
<span class="n">conf</span><span class="o">=</span><span class="n">conf</span><span class="p">,</span>
<span class="n">state</span><span class="o">=</span><span class="n">state</span>
<span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">run</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="n">run</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="bp">self</span>
<span class="c1"># create the associated task instances</span>
<span class="c1"># state is None at the moment of creation</span>
<span class="n">run</span><span class="o">.</span><span class="n">verify_integrity</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span>
<span class="n">run</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">()</span>
<span class="k">return</span> <span class="n">run</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.sync_to_db"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.sync_to_db">[docs]</a> <span class="k">def</span> <span class="nf">sync_to_db</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">owner</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">sync_time</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Save attributes about this DAG to the DB. Note that this method</span>
<span class="sd"> can be called for both DAGs and SubDAGs. A SubDag is actually a</span>
<span class="sd"> SubDagOperator.</span>
<span class="sd"> :param dag: the DAG object to save to the DB</span>
<span class="sd"> :type dag: airflow.models.DAG</span>
<span class="sd"> :param sync_time: The time that the DAG should be marked as sync&#39;ed</span>
<span class="sd"> :type sync_time: datetime</span>
<span class="sd"> :return: None</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">owner</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">owner</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">owner</span>
<span class="k">if</span> <span class="n">sync_time</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">sync_time</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="n">orm_dag</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
<span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagModel</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">orm_dag</span><span class="p">:</span>
<span class="n">orm_dag</span> <span class="o">=</span> <span class="n">DagModel</span><span class="p">(</span><span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_paused_upon_creation</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">orm_dag</span><span class="o">.</span><span class="n">is_paused</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_paused_upon_creation</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Creating ORM DAG for </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
<span class="n">orm_dag</span><span class="o">.</span><span class="n">fileloc</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">parent_dag</span><span class="o">.</span><span class="n">fileloc</span> <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_subdag</span> <span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">fileloc</span>
<span class="n">orm_dag</span><span class="o">.</span><span class="n">is_subdag</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_subdag</span>
<span class="n">orm_dag</span><span class="o">.</span><span class="n">owners</span> <span class="o">=</span> <span class="n">owner</span>
<span class="n">orm_dag</span><span class="o">.</span><span class="n">is_active</span> <span class="o">=</span> <span class="kc">True</span>
<span class="n">orm_dag</span><span class="o">.</span><span class="n">last_scheduler_run</span> <span class="o">=</span> <span class="n">sync_time</span>
<span class="n">orm_dag</span><span class="o">.</span><span class="n">default_view</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_default_view</span>
<span class="n">orm_dag</span><span class="o">.</span><span class="n">description</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">description</span>
<span class="n">orm_dag</span><span class="o">.</span><span class="n">schedule_interval</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">schedule_interval</span>
<span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">orm_dag</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="k">for</span> <span class="n">subdag</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">subdags</span><span class="p">:</span>
<span class="n">subdag</span><span class="o">.</span><span class="n">sync_to_db</span><span class="p">(</span><span class="n">owner</span><span class="o">=</span><span class="n">owner</span><span class="p">,</span> <span class="n">sync_time</span><span class="o">=</span><span class="n">sync_time</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span></div>
<span class="nd">@staticmethod</span>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.deactivate_unknown_dags"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.deactivate_unknown_dags">[docs]</a> <span class="k">def</span> <span class="nf">deactivate_unknown_dags</span><span class="p">(</span><span class="n">active_dag_ids</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Given a list of known DAGs, deactivate any other DAGs that are</span>
<span class="sd"> marked as active in the ORM</span>
<span class="sd"> :param active_dag_ids: list of DAG IDs that are active</span>
<span class="sd"> :type active_dag_ids: list[unicode]</span>
<span class="sd"> :return: None</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">active_dag_ids</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="k">return</span>
<span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
<span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="o">~</span><span class="n">DagModel</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">active_dag_ids</span><span class="p">))</span><span class="o">.</span><span class="n">all</span><span class="p">():</span>
<span class="n">dag</span><span class="o">.</span><span class="n">is_active</span> <span class="o">=</span> <span class="kc">False</span>
<span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">dag</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div>
<span class="nd">@staticmethod</span>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.deactivate_stale_dags"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.deactivate_stale_dags">[docs]</a> <span class="k">def</span> <span class="nf">deactivate_stale_dags</span><span class="p">(</span><span class="n">expiration_date</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Deactivate any DAGs that were last touched by the scheduler before</span>
<span class="sd"> the expiration date. These DAGs were likely deleted.</span>
<span class="sd"> :param expiration_date: set inactive DAGs that were touched before this</span>
<span class="sd"> time</span>
<span class="sd"> :type expiration_date: datetime</span>
<span class="sd"> :return: None</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">log</span> <span class="o">=</span> <span class="n">LoggingMixin</span><span class="p">()</span><span class="o">.</span><span class="n">log</span>
<span class="k">for</span> <span class="n">dag</span> <span class="ow">in</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span>
<span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagModel</span><span class="o">.</span><span class="n">last_scheduler_run</span> <span class="o">&lt;</span> <span class="n">expiration_date</span><span class="p">,</span>
<span class="n">DagModel</span><span class="o">.</span><span class="n">is_active</span><span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">():</span>
<span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s2">&quot;Deactivating DAG ID </span><span class="si">%s</span><span class="s2"> since it was last touched by the scheduler at </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span>
<span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">dag</span><span class="o">.</span><span class="n">last_scheduler_run</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
<span class="p">)</span>
<span class="n">dag</span><span class="o">.</span><span class="n">is_active</span> <span class="o">=</span> <span class="kc">False</span>
<span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">dag</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div>
<span class="nd">@staticmethod</span>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DAG.get_num_task_instances"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.get_num_task_instances">[docs]</a> <span class="k">def</span> <span class="nf">get_num_task_instances</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task_ids</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">states</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns the number of task instances in the given DAG.</span>
<span class="sd"> :param session: ORM session</span>
<span class="sd"> :param dag_id: ID of the DAG to get the task concurrency of</span>
<span class="sd"> :type dag_id: unicode</span>
<span class="sd"> :param task_ids: A list of valid task IDs for the given DAG</span>
<span class="sd"> :type task_ids: list[unicode]</span>
<span class="sd"> :param states: A list of states to filter by if supplied</span>
<span class="sd"> :type states: list[state]</span>
<span class="sd"> :return: The number of running tasks</span>
<span class="sd"> :rtype: int</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">qry</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">func</span><span class="o">.</span><span class="n">count</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">task_id</span><span class="p">))</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="n">dag_id</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">task_ids</span><span class="p">:</span>
<span class="n">qry</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">task_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">task_ids</span><span class="p">),</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">states</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">if</span> <span class="kc">None</span> <span class="ow">in</span> <span class="n">states</span><span class="p">:</span>
<span class="n">qry</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">or_</span><span class="p">(</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">state</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">states</span><span class="p">),</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">state</span><span class="o">.</span><span class="n">is_</span><span class="p">(</span><span class="kc">None</span><span class="p">)))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">qry</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">state</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">states</span><span class="p">))</span>
<span class="k">return</span> <span class="n">qry</span><span class="o">.</span><span class="n">scalar</span><span class="p">()</span></div>
<div class="viewcode-block" id="DAG.test_cycle"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG.test_cycle">[docs]</a> <span class="k">def</span> <span class="nf">test_cycle</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Check to see if there are any cycles in the DAG. Returns False if no cycle found,</span>
<span class="sd"> otherwise raises exception.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">airflow.models.dagbag</span> <span class="k">import</span> <span class="n">DagBag</span> <span class="c1"># Avoid circular imports</span>
<span class="c1"># default of int is 0 which corresponds to CYCLE_NEW</span>
<span class="n">visit_map</span> <span class="o">=</span> <span class="n">defaultdict</span><span class="p">(</span><span class="nb">int</span><span class="p">)</span>
<span class="k">for</span> <span class="n">task_id</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span><span class="o">.</span><span class="n">keys</span><span class="p">():</span>
<span class="c1"># print(&#39;starting %s&#39; % task_id)</span>
<span class="k">if</span> <span class="n">visit_map</span><span class="p">[</span><span class="n">task_id</span><span class="p">]</span> <span class="o">==</span> <span class="n">DagBag</span><span class="o">.</span><span class="n">CYCLE_NEW</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_test_cycle_helper</span><span class="p">(</span><span class="n">visit_map</span><span class="p">,</span> <span class="n">task_id</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">False</span></div>
<div class="viewcode-block" id="DAG._test_cycle_helper"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DAG._test_cycle_helper">[docs]</a> <span class="k">def</span> <span class="nf">_test_cycle_helper</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">visit_map</span><span class="p">,</span> <span class="n">task_id</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Checks if a cycle exists from the input task using DFS traversal</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">airflow.models.dagbag</span> <span class="k">import</span> <span class="n">DagBag</span> <span class="c1"># Avoid circular imports</span>
<span class="c1"># print(&#39;Inspecting %s&#39; % task_id)</span>
<span class="k">if</span> <span class="n">visit_map</span><span class="p">[</span><span class="n">task_id</span><span class="p">]</span> <span class="o">==</span> <span class="n">DagBag</span><span class="o">.</span><span class="n">CYCLE_DONE</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="n">visit_map</span><span class="p">[</span><span class="n">task_id</span><span class="p">]</span> <span class="o">=</span> <span class="n">DagBag</span><span class="o">.</span><span class="n">CYCLE_IN_PROGRESS</span>
<span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_dict</span><span class="p">[</span><span class="n">task_id</span><span class="p">]</span>
<span class="k">for</span> <span class="n">descendant_id</span> <span class="ow">in</span> <span class="n">task</span><span class="o">.</span><span class="n">get_direct_relative_ids</span><span class="p">():</span>
<span class="k">if</span> <span class="n">visit_map</span><span class="p">[</span><span class="n">descendant_id</span><span class="p">]</span> <span class="o">==</span> <span class="n">DagBag</span><span class="o">.</span><span class="n">CYCLE_IN_PROGRESS</span><span class="p">:</span>
<span class="n">msg</span> <span class="o">=</span> <span class="s2">&quot;Cycle detected in DAG. Faulty task: </span><span class="si">{0}</span><span class="s2"> to </span><span class="si">{1}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">task_id</span><span class="p">,</span> <span class="n">descendant_id</span><span class="p">)</span>
<span class="k">raise</span> <span class="n">AirflowDagCycleException</span><span class="p">(</span><span class="n">msg</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_test_cycle_helper</span><span class="p">(</span><span class="n">visit_map</span><span class="p">,</span> <span class="n">descendant_id</span><span class="p">)</span>
<span class="n">visit_map</span><span class="p">[</span><span class="n">task_id</span><span class="p">]</span> <span class="o">=</span> <span class="n">DagBag</span><span class="o">.</span><span class="n">CYCLE_DONE</span></div></div>
<div class="viewcode-block" id="DagModel"><a class="viewcode-back" href="../../../_api/airflow/models/index.html#airflow.models.dag.DagModel">[docs]</a><span class="k">class</span> <span class="nc">DagModel</span><span class="p">(</span><span class="n">Base</span><span class="p">):</span>
<div class="viewcode-block" id="DagModel.__tablename__"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.__tablename__">[docs]</a> <span class="n">__tablename__</span> <span class="o">=</span> <span class="s2">&quot;dag&quot;</span></div>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> These items are stored in the database for state related information</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="DagModel.dag_id"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.dag_id">[docs]</a> <span class="n">dag_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="n">ID_LEN</span><span class="p">),</span> <span class="n">primary_key</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div>
<span class="c1"># A DAG can be paused from the UI / DB</span>
<span class="c1"># Set this default value of is_paused based on a configuration value!</span>
<div class="viewcode-block" id="DagModel.is_paused_at_creation"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.is_paused_at_creation">[docs]</a> <span class="n">is_paused_at_creation</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">conf</span>\
<span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">&#39;core&#39;</span><span class="p">,</span>
<span class="s1">&#39;dags_are_paused_at_creation&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="DagModel.is_paused"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.is_paused">[docs]</a> <span class="n">is_paused</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Boolean</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="n">is_paused_at_creation</span><span class="p">)</span></div>
<span class="c1"># Whether the DAG is a subdag</span>
<div class="viewcode-block" id="DagModel.is_subdag"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.is_subdag">[docs]</a> <span class="n">is_subdag</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Boolean</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div>
<span class="c1"># Whether that DAG was seen on the last DagBag load</span>
<div class="viewcode-block" id="DagModel.is_active"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.is_active">[docs]</a> <span class="n">is_active</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Boolean</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div>
<span class="c1"># Last time the scheduler started</span>
<div class="viewcode-block" id="DagModel.last_scheduler_run"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.last_scheduler_run">[docs]</a> <span class="n">last_scheduler_run</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">UtcDateTime</span><span class="p">)</span></div>
<span class="c1"># Last time this DAG was pickled</span>
<div class="viewcode-block" id="DagModel.last_pickled"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.last_pickled">[docs]</a> <span class="n">last_pickled</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">UtcDateTime</span><span class="p">)</span></div>
<span class="c1"># Time when the DAG last received a refresh signal</span>
<span class="c1"># (e.g. the DAG&#39;s &quot;refresh&quot; button was clicked in the web UI)</span>
<div class="viewcode-block" id="DagModel.last_expired"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.last_expired">[docs]</a> <span class="n">last_expired</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">UtcDateTime</span><span class="p">)</span></div>
<span class="c1"># Whether (one of) the scheduler is scheduling this DAG at the moment</span>
<div class="viewcode-block" id="DagModel.scheduler_lock"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.scheduler_lock">[docs]</a> <span class="n">scheduler_lock</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Boolean</span><span class="p">)</span></div>
<span class="c1"># Foreign key to the latest pickle_id</span>
<div class="viewcode-block" id="DagModel.pickle_id"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.pickle_id">[docs]</a> <span class="n">pickle_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span></div>
<span class="c1"># The location of the file containing the DAG object</span>
<span class="c1"># Note: Do not depend on fileloc pointing to a file; in the case of a</span>
<span class="c1"># packaged DAG, it will point to the subpath of the DAG within the</span>
<span class="c1"># associated zip.</span>
<div class="viewcode-block" id="DagModel.fileloc"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.fileloc">[docs]</a> <span class="n">fileloc</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">2000</span><span class="p">))</span></div>
<span class="c1"># String representing the owners</span>
<div class="viewcode-block" id="DagModel.owners"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.owners">[docs]</a> <span class="n">owners</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">2000</span><span class="p">))</span></div>
<span class="c1"># Description of the dag</span>
<div class="viewcode-block" id="DagModel.description"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.description">[docs]</a> <span class="n">description</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Text</span><span class="p">)</span></div>
<span class="c1"># Default view of the inside the webserver</span>
<div class="viewcode-block" id="DagModel.default_view"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.default_view">[docs]</a> <span class="n">default_view</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">25</span><span class="p">))</span></div>
<span class="c1"># Schedule interval</span>
<div class="viewcode-block" id="DagModel.schedule_interval"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.schedule_interval">[docs]</a> <span class="n">schedule_interval</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Interval</span><span class="p">)</span></div>
<div class="viewcode-block" id="DagModel.__repr__"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.__repr__">[docs]</a> <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s2">&quot;&lt;DAG: </span><span class="si">{self.dag_id}</span><span class="s2">&gt;&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">=</span><span class="bp">self</span><span class="p">)</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="DagModel.timezone"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.timezone">[docs]</a> <span class="k">def</span> <span class="nf">timezone</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="n">settings</span><span class="o">.</span><span class="n">TIMEZONE</span></div>
<span class="nd">@staticmethod</span>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DagModel.get_dagmodel"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.get_dagmodel">[docs]</a> <span class="k">def</span> <span class="nf">get_dagmodel</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">return</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagModel</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="n">dag_id</span><span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span></div>
<span class="nd">@classmethod</span>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DagModel.get_current"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.get_current">[docs]</a> <span class="k">def</span> <span class="nf">get_current</span><span class="p">(</span><span class="bp">cls</span><span class="p">,</span> <span class="n">dag_id</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="k">return</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="bp">cls</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="n">dag_id</span><span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span></div>
<div class="viewcode-block" id="DagModel.get_default_view"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.get_default_view">[docs]</a> <span class="k">def</span> <span class="nf">get_default_view</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_view</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;webserver&#39;</span><span class="p">,</span> <span class="s1">&#39;dag_default_view&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">default_view</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DagModel.get_last_dagrun"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.get_last_dagrun">[docs]</a> <span class="k">def</span> <span class="nf">get_last_dagrun</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">include_externally_triggered</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="k">return</span> <span class="n">get_last_dagrun</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">,</span>
<span class="n">include_externally_triggered</span><span class="o">=</span><span class="n">include_externally_triggered</span><span class="p">)</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="DagModel.safe_dag_id"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.safe_dag_id">[docs]</a> <span class="k">def</span> <span class="nf">safe_dag_id</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;.&#39;</span><span class="p">,</span> <span class="s1">&#39;__dot__&#39;</span><span class="p">)</span></div>
<div class="viewcode-block" id="DagModel.get_dag"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.get_dag">[docs]</a> <span class="k">def</span> <span class="nf">get_dag</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="n">DagBag</span><span class="p">(</span><span class="n">dag_folder</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">fileloc</span><span class="p">)</span><span class="o">.</span><span class="n">get_dag</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DagModel.create_dagrun"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.create_dagrun">[docs]</a> <span class="k">def</span> <span class="nf">create_dagrun</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">run_id</span><span class="p">,</span>
<span class="n">state</span><span class="p">,</span>
<span class="n">execution_date</span><span class="p">,</span>
<span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">external_trigger</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">conf</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Creates a dag run from this dag including the tasks associated with this dag.</span>
<span class="sd"> Returns the dag run.</span>
<span class="sd"> :param run_id: defines the the run id for this dag run</span>
<span class="sd"> :type run_id: str</span>
<span class="sd"> :param execution_date: the execution date of this dag run</span>
<span class="sd"> :type execution_date: datetime.datetime</span>
<span class="sd"> :param state: the state of the dag run</span>
<span class="sd"> :type state: airflow.utils.state.State</span>
<span class="sd"> :param start_date: the date this dag run should be evaluated</span>
<span class="sd"> :type start_date: datetime.datetime</span>
<span class="sd"> :param external_trigger: whether this dag run is externally triggered</span>
<span class="sd"> :type external_trigger: bool</span>
<span class="sd"> :param session: database session</span>
<span class="sd"> :type session: sqlalchemy.orm.session.Session</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_dag</span><span class="p">()</span><span class="o">.</span><span class="n">create_dagrun</span><span class="p">(</span><span class="n">run_id</span><span class="o">=</span><span class="n">run_id</span><span class="p">,</span>
<span class="n">state</span><span class="o">=</span><span class="n">state</span><span class="p">,</span>
<span class="n">execution_date</span><span class="o">=</span><span class="n">execution_date</span><span class="p">,</span>
<span class="n">start_date</span><span class="o">=</span><span class="n">start_date</span><span class="p">,</span>
<span class="n">external_trigger</span><span class="o">=</span><span class="n">external_trigger</span><span class="p">,</span>
<span class="n">conf</span><span class="o">=</span><span class="n">conf</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="DagModel.set_is_paused"><a class="viewcode-back" href="../../../_api/airflow/models/dag/index.html#airflow.models.dag.DagModel.set_is_paused">[docs]</a> <span class="k">def</span> <span class="nf">set_is_paused</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">is_paused</span><span class="p">,</span> <span class="c1"># type: bool</span>
<span class="n">including_subdags</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="c1"># type: bool</span>
<span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="p">):</span>
<span class="c1"># type: (...) -&gt; None</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Pause/Un-pause a DAG.</span>
<span class="sd"> :param is_paused: Is the DAG paused</span>
<span class="sd"> :param including_subdags: whether to include the DAG&#39;s subdags</span>
<span class="sd"> :param session: session</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">dag_ids</span> <span class="o">=</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">]</span> <span class="c1"># type: List[str]</span>
<span class="k">if</span> <span class="n">including_subdags</span><span class="p">:</span>
<span class="n">subdags</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_dag</span><span class="p">()</span><span class="o">.</span><span class="n">subdags</span>
<span class="n">dag_ids</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="n">subdag</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">subdag</span> <span class="ow">in</span> <span class="n">subdags</span><span class="p">])</span>
<span class="n">dag_models</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagModel</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">DagModel</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">dag_ids</span><span class="p">))</span><span class="o">.</span><span class="n">all</span><span class="p">()</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">for</span> <span class="n">dag_model</span> <span class="ow">in</span> <span class="n">dag_models</span><span class="p">:</span>
<span class="n">dag_model</span><span class="o">.</span><span class="n">is_paused</span> <span class="o">=</span> <span class="n">is_paused</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span>
<span class="n">session</span><span class="o">.</span><span class="n">rollback</span><span class="p">()</span>
<span class="k">raise</span></div></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>