blob: 513aa2ba839ae9361af700d59db64cd66c61c217 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>airflow.models.taskinstance &mdash; Airflow Documentation</title>
<script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/language_data.js"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../genindex.html" />
<link rel="search" title="Search" href="../../../search.html" />
<script>
document.addEventListener('DOMContentLoaded', function() {
var el = document.getElementById('changelog');
if (el !== null ) {
// [AIRFLOW-...]
el.innerHTML = el.innerHTML.replace(
/\[(AIRFLOW-[\d]+)\]/g,
`<a href="https://issues.apache.org/jira/browse/$1">[$1]</a>`
);
// (#...)
el.innerHTML = el.innerHTML.replace(
/\(#([\d]+)\)/g,
`<a href="https://github.com/apache/airflow/pull/$1">(#$1)</a>`
);
};
})
</script>
<style>
.example-header {
position: relative;
background: #9AAA7A;
padding: 8px 16px;
margin-bottom: 0;
}
.example-header--with-button {
padding-right: 166px;
}
.example-header:after{
content: '';
display: table;
clear: both;
}
.example-title {
display:block;
padding: 4px;
margin-right: 16px;
color: white;
overflow-x: auto;
}
.example-header-button {
top: 8px;
right: 16px;
position: absolute;
}
.example-header + .highlight-python {
margin-top: 0 !important;
}
.viewcode-button {
display: inline-block;
padding: 8px 16px;
border: 0;
margin: 0;
outline: 0;
border-radius: 2px;
-webkit-box-shadow: 0 3px 5px 0 rgba(0,0,0,.3);
box-shadow: 0 3px 6px 0 rgba(0,0,0,.3);
color: #404040;
background-color: #e7e7e7;
cursor: pointer;
font-size: 16px;
font-weight: 500;
line-height: 1;
text-decoration: none;
text-overflow: ellipsis;
overflow: hidden;
text-transform: uppercase;
-webkit-transition: background-color .2s;
transition: background-color .2s;
vertical-align: middle;
white-space: nowrap;
}
.viewcode-button:visited {
color: #404040;
}
.viewcode-button:hover, .viewcode-button:focus {
color: #404040;
background-color: #d6d6d6;
}
</style>
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../index.html" class="icon icon-home"> Airflow
</a>
<div class="version">
1.10.4
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../project.html">Project</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../license.html">License</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../start.html">Quick Start</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../installation.html">Installation</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../tutorial.html">Tutorial</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../howto/index.html">How-to Guides</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../ui.html">UI / Screenshots</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../concepts.html">Concepts</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../profiling.html">Data Profiling</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../cli.html">Command Line Interface</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../scheduler.html">Scheduling &amp; Triggers</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../plugins.html">Plugins</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../security.html">Security</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../timezone.html">Time zones</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../api.html">Experimental Rest API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../integration.html">Integration</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../metrics.html">Metrics</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../kubernetes.html">Kubernetes</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../lineage.html">Lineage</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../changelog.html">Changelog</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../faq.html">FAQ</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../macros.html">Macros reference</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../_api/index.html">API Reference</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Airflow</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li><a href="../models.html">airflow.models</a> &raquo;</li>
<li>airflow.models.taskinstance</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for airflow.models.taskinstance</h1><div class="highlight"><pre>
<span></span><span class="c1"># -*- coding: utf-8 -*-</span>
<span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span>
<span class="c1"># or more contributor license agreements. See the NOTICE file</span>
<span class="c1"># distributed with this work for additional information</span>
<span class="c1"># regarding copyright ownership. The ASF licenses this file</span>
<span class="c1"># to you under the Apache License, Version 2.0 (the</span>
<span class="c1"># &quot;License&quot;); you may not use this file except in compliance</span>
<span class="c1"># with the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing,</span>
<span class="c1"># software distributed under the License is distributed on an</span>
<span class="c1"># &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span>
<span class="c1"># KIND, either express or implied. See the License for the</span>
<span class="c1"># specific language governing permissions and limitations</span>
<span class="c1"># under the License.</span>
<span class="kn">import</span> <span class="nn">copy</span>
<span class="kn">import</span> <span class="nn">functools</span>
<span class="kn">import</span> <span class="nn">getpass</span>
<span class="kn">import</span> <span class="nn">hashlib</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">math</span>
<span class="kn">import</span> <span class="nn">os</span>
<span class="kn">import</span> <span class="nn">signal</span>
<span class="kn">import</span> <span class="nn">time</span>
<span class="kn">from</span> <span class="nn">datetime</span> <span class="k">import</span> <span class="n">timedelta</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="k">import</span> <span class="n">Optional</span>
<span class="kn">from</span> <span class="nn">urllib.parse</span> <span class="k">import</span> <span class="n">quote</span>
<span class="kn">import</span> <span class="nn">dill</span>
<span class="kn">import</span> <span class="nn">lazy_object_proxy</span>
<span class="kn">import</span> <span class="nn">pendulum</span>
<span class="kn">from</span> <span class="nn">six.moves.urllib.parse</span> <span class="k">import</span> <span class="n">quote_plus</span>
<span class="kn">from</span> <span class="nn">sqlalchemy</span> <span class="k">import</span> <span class="n">Column</span><span class="p">,</span> <span class="n">String</span><span class="p">,</span> <span class="n">Float</span><span class="p">,</span> <span class="n">Integer</span><span class="p">,</span> <span class="n">PickleType</span><span class="p">,</span> <span class="n">Index</span><span class="p">,</span> <span class="n">func</span>
<span class="kn">from</span> <span class="nn">sqlalchemy.orm</span> <span class="k">import</span> <span class="n">reconstructor</span>
<span class="kn">from</span> <span class="nn">sqlalchemy.orm.session</span> <span class="k">import</span> <span class="n">Session</span>
<span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">configuration</span><span class="p">,</span> <span class="n">settings</span>
<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="p">(</span>
<span class="n">AirflowException</span><span class="p">,</span> <span class="n">AirflowTaskTimeout</span><span class="p">,</span> <span class="n">AirflowSkipException</span><span class="p">,</span> <span class="n">AirflowRescheduleException</span>
<span class="p">)</span>
<span class="kn">from</span> <span class="nn">airflow.models.base</span> <span class="k">import</span> <span class="n">Base</span><span class="p">,</span> <span class="n">ID_LEN</span>
<span class="kn">from</span> <span class="nn">airflow.models.log</span> <span class="k">import</span> <span class="n">Log</span>
<span class="kn">from</span> <span class="nn">airflow.models.pool</span> <span class="k">import</span> <span class="n">Pool</span>
<span class="kn">from</span> <span class="nn">airflow.models.taskfail</span> <span class="k">import</span> <span class="n">TaskFail</span>
<span class="kn">from</span> <span class="nn">airflow.models.taskreschedule</span> <span class="k">import</span> <span class="n">TaskReschedule</span>
<span class="kn">from</span> <span class="nn">airflow.models.variable</span> <span class="k">import</span> <span class="n">Variable</span>
<span class="kn">from</span> <span class="nn">airflow.models.xcom</span> <span class="k">import</span> <span class="n">XCom</span><span class="p">,</span> <span class="n">XCOM_RETURN_KEY</span>
<span class="kn">from</span> <span class="nn">airflow.settings</span> <span class="k">import</span> <span class="n">Stats</span>
<span class="kn">from</span> <span class="nn">airflow.ti_deps.dep_context</span> <span class="k">import</span> <span class="n">DepContext</span><span class="p">,</span> <span class="n">QUEUE_DEPS</span><span class="p">,</span> <span class="n">RUN_DEPS</span>
<span class="kn">from</span> <span class="nn">airflow.utils</span> <span class="k">import</span> <span class="n">timezone</span>
<span class="kn">from</span> <span class="nn">airflow.utils.db</span> <span class="k">import</span> <span class="n">provide_session</span>
<span class="kn">from</span> <span class="nn">airflow.utils.email</span> <span class="k">import</span> <span class="n">send_email</span>
<span class="kn">from</span> <span class="nn">airflow.utils.helpers</span> <span class="k">import</span> <span class="n">is_container</span>
<span class="kn">from</span> <span class="nn">airflow.utils.log.logging_mixin</span> <span class="k">import</span> <span class="n">LoggingMixin</span>
<span class="kn">from</span> <span class="nn">airflow.utils.net</span> <span class="k">import</span> <span class="n">get_hostname</span>
<span class="kn">from</span> <span class="nn">airflow.utils.sqlalchemy</span> <span class="k">import</span> <span class="n">UtcDateTime</span>
<span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="k">import</span> <span class="n">State</span>
<span class="kn">from</span> <span class="nn">airflow.utils.timeout</span> <span class="k">import</span> <span class="n">timeout</span>
<div class="viewcode-block" id="clear_task_instances"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.clear_task_instances">[docs]</a><span class="k">def</span> <span class="nf">clear_task_instances</span><span class="p">(</span><span class="n">tis</span><span class="p">,</span>
<span class="n">session</span><span class="p">,</span>
<span class="n">activate_dag_runs</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">dag</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Clears a set of task instances, but makes sure the running ones</span>
<span class="sd"> get killed.</span>
<span class="sd"> :param tis: a list of task instances</span>
<span class="sd"> :param session: current session</span>
<span class="sd"> :param activate_dag_runs: flag to check for active dag run</span>
<span class="sd"> :param dag: DAG object</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">job_ids</span> <span class="o">=</span> <span class="p">[]</span>
<span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">:</span>
<span class="k">if</span> <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">:</span>
<span class="k">if</span> <span class="n">ti</span><span class="o">.</span><span class="n">job_id</span><span class="p">:</span>
<span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SHUTDOWN</span>
<span class="n">job_ids</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">ti</span><span class="o">.</span><span class="n">job_id</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">task_id</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">task_id</span>
<span class="k">if</span> <span class="n">dag</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">has_task</span><span class="p">(</span><span class="n">task_id</span><span class="p">):</span>
<span class="n">task</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">task_id</span><span class="p">)</span>
<span class="n">task_retries</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">retries</span>
<span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">try_number</span> <span class="o">+</span> <span class="n">task_retries</span> <span class="o">-</span> <span class="mi">1</span>
<span class="k">else</span><span class="p">:</span>
<span class="c1"># Ignore errors when updating max_tries if dag is None or</span>
<span class="c1"># task not found in dag since database records could be</span>
<span class="c1"># outdated. We make max_tries the maximum value of its</span>
<span class="c1"># original max_tries or the current task try number.</span>
<span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="nb">max</span><span class="p">(</span><span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span><span class="p">,</span> <span class="n">ti</span><span class="o">.</span><span class="n">try_number</span> <span class="o">-</span> <span class="mi">1</span><span class="p">)</span>
<span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">NONE</span>
<span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span>
<span class="k">if</span> <span class="n">job_ids</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">airflow.jobs</span> <span class="k">import</span> <span class="n">BaseJob</span> <span class="k">as</span> <span class="n">BJ</span>
<span class="k">for</span> <span class="n">job</span> <span class="ow">in</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">BJ</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">BJ</span><span class="o">.</span><span class="n">id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">job_ids</span><span class="p">))</span><span class="o">.</span><span class="n">all</span><span class="p">():</span>
<span class="n">job</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SHUTDOWN</span>
<span class="k">if</span> <span class="n">activate_dag_runs</span> <span class="ow">and</span> <span class="n">tis</span><span class="p">:</span>
<span class="kn">from</span> <span class="nn">airflow.models.dagrun</span> <span class="k">import</span> <span class="n">DagRun</span> <span class="c1"># Avoid circular import</span>
<span class="n">drs</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagRun</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">DagRun</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">in_</span><span class="p">({</span><span class="n">ti</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">}),</span>
<span class="n">DagRun</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">in_</span><span class="p">({</span><span class="n">ti</span><span class="o">.</span><span class="n">execution_date</span> <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">}),</span>
<span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">()</span>
<span class="k">for</span> <span class="n">dr</span> <span class="ow">in</span> <span class="n">drs</span><span class="p">:</span>
<span class="n">dr</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span>
<span class="n">dr</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span></div>
<div class="viewcode-block" id="TaskInstance"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance">[docs]</a><span class="k">class</span> <span class="nc">TaskInstance</span><span class="p">(</span><span class="n">Base</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Task instances store the state of a task instance. This table is the</span>
<span class="sd"> authority and single source of truth around what tasks have run and the</span>
<span class="sd"> state they are in.</span>
<span class="sd"> The SqlAlchemy model doesn&#39;t have a SqlAlchemy foreign key to the task or</span>
<span class="sd"> dag model deliberately to have more control over transactions.</span>
<span class="sd"> Database transactions on this table should insure double triggers and</span>
<span class="sd"> any confusion around what task instances are or aren&#39;t ready to run</span>
<span class="sd"> even while multiple schedulers may be firing task instances.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="TaskInstance.__tablename__"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.__tablename__">[docs]</a> <span class="n">__tablename__</span> <span class="o">=</span> <span class="s2">&quot;task_instance&quot;</span></div>
<div class="viewcode-block" id="TaskInstance.task_id"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.task_id">[docs]</a> <span class="n">task_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="n">ID_LEN</span><span class="p">),</span> <span class="n">primary_key</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.dag_id"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.dag_id">[docs]</a> <span class="n">dag_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="n">ID_LEN</span><span class="p">),</span> <span class="n">primary_key</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.execution_date"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.execution_date">[docs]</a> <span class="n">execution_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">UtcDateTime</span><span class="p">,</span> <span class="n">primary_key</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.start_date"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.start_date">[docs]</a> <span class="n">start_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">UtcDateTime</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.end_date"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.end_date">[docs]</a> <span class="n">end_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">UtcDateTime</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.duration"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.duration">[docs]</a> <span class="n">duration</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Float</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.state"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.state">[docs]</a> <span class="n">state</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">20</span><span class="p">))</span></div>
<div class="viewcode-block" id="TaskInstance._try_number"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance._try_number">[docs]</a> <span class="n">_try_number</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="s1">&#39;try_number&#39;</span><span class="p">,</span> <span class="n">Integer</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.max_tries"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.max_tries">[docs]</a> <span class="n">max_tries</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.hostname"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.hostname">[docs]</a> <span class="n">hostname</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">1000</span><span class="p">))</span></div>
<div class="viewcode-block" id="TaskInstance.unixname"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.unixname">[docs]</a> <span class="n">unixname</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">1000</span><span class="p">))</span></div>
<div class="viewcode-block" id="TaskInstance.job_id"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.job_id">[docs]</a> <span class="n">job_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.pool"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.pool">[docs]</a> <span class="n">pool</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">50</span><span class="p">),</span> <span class="n">nullable</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.queue"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.queue">[docs]</a> <span class="n">queue</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">256</span><span class="p">))</span></div>
<div class="viewcode-block" id="TaskInstance.priority_weight"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.priority_weight">[docs]</a> <span class="n">priority_weight</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.operator"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.operator">[docs]</a> <span class="n">operator</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">1000</span><span class="p">))</span></div>
<div class="viewcode-block" id="TaskInstance.queued_dttm"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.queued_dttm">[docs]</a> <span class="n">queued_dttm</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">UtcDateTime</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.pid"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.pid">[docs]</a> <span class="n">pid</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.executor_config"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.executor_config">[docs]</a> <span class="n">executor_config</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">PickleType</span><span class="p">(</span><span class="n">pickler</span><span class="o">=</span><span class="n">dill</span><span class="p">))</span></div>
<div class="viewcode-block" id="TaskInstance.__table_args__"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.__table_args__">[docs]</a> <span class="n">__table_args__</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">Index</span><span class="p">(</span><span class="s1">&#39;ti_dag_state&#39;</span><span class="p">,</span> <span class="n">dag_id</span><span class="p">,</span> <span class="n">state</span><span class="p">),</span>
<span class="n">Index</span><span class="p">(</span><span class="s1">&#39;ti_dag_date&#39;</span><span class="p">,</span> <span class="n">dag_id</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">),</span>
<span class="n">Index</span><span class="p">(</span><span class="s1">&#39;ti_state&#39;</span><span class="p">,</span> <span class="n">state</span><span class="p">),</span>
<span class="n">Index</span><span class="p">(</span><span class="s1">&#39;ti_state_lkp&#39;</span><span class="p">,</span> <span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">,</span> <span class="n">state</span><span class="p">),</span>
<span class="n">Index</span><span class="p">(</span><span class="s1">&#39;ti_pool&#39;</span><span class="p">,</span> <span class="n">pool</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">priority_weight</span><span class="p">),</span>
<span class="n">Index</span><span class="p">(</span><span class="s1">&#39;ti_job_id&#39;</span><span class="p">,</span> <span class="n">job_id</span><span class="p">),</span></div>
<span class="p">)</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">,</span> <span class="n">state</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">dag_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_id</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task</span> <span class="o">=</span> <span class="n">task</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_log</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s2">&quot;airflow.task&quot;</span><span class="p">)</span>
<span class="c1"># make sure we have a localized execution_date stored in UTC</span>
<span class="k">if</span> <span class="n">execution_date</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">timezone</span><span class="o">.</span><span class="n">is_localized</span><span class="p">(</span><span class="n">execution_date</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">&quot;execution date </span><span class="si">%s</span><span class="s2"> has no timezone information. Using &quot;</span>
<span class="s2">&quot;default from dag or system&quot;</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">has_dag</span><span class="p">():</span>
<span class="n">execution_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">make_aware</span><span class="p">(</span><span class="n">execution_date</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">timezone</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">execution_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">make_aware</span><span class="p">(</span><span class="n">execution_date</span><span class="p">)</span>
<span class="n">execution_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">convert_to_utc</span><span class="p">(</span><span class="n">execution_date</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">=</span> <span class="n">execution_date</span>
<span class="bp">self</span><span class="o">.</span><span class="n">queue</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">queue</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pool</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">pool</span>
<span class="bp">self</span><span class="o">.</span><span class="n">priority_weight</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">priority_weight_total</span>
<span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">=</span> <span class="mi">0</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retries</span>
<span class="bp">self</span><span class="o">.</span><span class="n">unixname</span> <span class="o">=</span> <span class="n">getpass</span><span class="o">.</span><span class="n">getuser</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">run_as_user</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">run_as_user</span>
<span class="k">if</span> <span class="n">state</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">state</span>
<span class="bp">self</span><span class="o">.</span><span class="n">hostname</span> <span class="o">=</span> <span class="s1">&#39;&#39;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">executor_config</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">executor_config</span>
<span class="bp">self</span><span class="o">.</span><span class="n">init_on_load</span><span class="p">()</span>
<span class="c1"># Is this TaskInstance being currently running within `airflow run --raw`.</span>
<span class="c1"># Not persisted to the database so only valid for the current process</span>
<span class="bp">self</span><span class="o">.</span><span class="n">raw</span> <span class="o">=</span> <span class="kc">False</span>
<span class="nd">@reconstructor</span>
<div class="viewcode-block" id="TaskInstance.init_on_load"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.init_on_load">[docs]</a> <span class="k">def</span> <span class="nf">init_on_load</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot; Initialize the attributes that aren&#39;t stored in the DB. &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_mode</span> <span class="o">=</span> <span class="kc">False</span> <span class="c1"># can be changed when calling &#39;run&#39;</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="TaskInstance.try_number"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.try_number">[docs]</a> <span class="k">def</span> <span class="nf">try_number</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Return the try number that this task number will be when it is actually</span>
<span class="sd"> run.</span>
<span class="sd"> If the TI is currently running, this will match the column in the</span>
<span class="sd"> databse, in all othercases this will be incremenetd</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># This is designed so that task logs end up in the right file.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">+</span> <span class="mi">1</span></div>
<span class="nd">@try_number</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">try_number</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">=</span> <span class="n">value</span>
<span class="nd">@property</span>
<div class="viewcode-block" id="TaskInstance.next_try_number"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.next_try_number">[docs]</a> <span class="k">def</span> <span class="nf">next_try_number</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">+</span> <span class="mi">1</span></div>
<div class="viewcode-block" id="TaskInstance.command"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.command">[docs]</a> <span class="k">def</span> <span class="nf">command</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_all_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_task_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_ti_state</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">local</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">pickle_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">raw</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">job_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">cfg_path</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns a command that can be executed anywhere where airflow is</span>
<span class="sd"> installed. This command is part of the message sent to executors by</span>
<span class="sd"> the orchestrator.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="s2">&quot; &quot;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">command_as_list</span><span class="p">(</span>
<span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span>
<span class="n">ignore_all_deps</span><span class="o">=</span><span class="n">ignore_all_deps</span><span class="p">,</span>
<span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="n">ignore_depends_on_past</span><span class="p">,</span>
<span class="n">ignore_task_deps</span><span class="o">=</span><span class="n">ignore_task_deps</span><span class="p">,</span>
<span class="n">ignore_ti_state</span><span class="o">=</span><span class="n">ignore_ti_state</span><span class="p">,</span>
<span class="n">local</span><span class="o">=</span><span class="n">local</span><span class="p">,</span>
<span class="n">pickle_id</span><span class="o">=</span><span class="n">pickle_id</span><span class="p">,</span>
<span class="n">raw</span><span class="o">=</span><span class="n">raw</span><span class="p">,</span>
<span class="n">job_id</span><span class="o">=</span><span class="n">job_id</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span>
<span class="n">cfg_path</span><span class="o">=</span><span class="n">cfg_path</span><span class="p">))</span></div>
<div class="viewcode-block" id="TaskInstance.command_as_list"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.command_as_list">[docs]</a> <span class="k">def</span> <span class="nf">command_as_list</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_all_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_task_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_ti_state</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">local</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">pickle_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">raw</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">job_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">cfg_path</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns a command that can be executed anywhere where airflow is</span>
<span class="sd"> installed. This command is part of the message sent to executors by</span>
<span class="sd"> the orchestrator.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">dag</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span>
<span class="n">should_pass_filepath</span> <span class="o">=</span> <span class="ow">not</span> <span class="n">pickle_id</span> <span class="ow">and</span> <span class="n">dag</span>
<span class="k">if</span> <span class="n">should_pass_filepath</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span> <span class="o">!=</span> <span class="n">dag</span><span class="o">.</span><span class="n">filepath</span><span class="p">:</span>
<span class="n">path</span> <span class="o">=</span> <span class="s2">&quot;DAGS_FOLDER/</span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dag</span><span class="o">.</span><span class="n">filepath</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">should_pass_filepath</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span><span class="p">:</span>
<span class="n">path</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">path</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">return</span> <span class="n">TaskInstance</span><span class="o">.</span><span class="n">generate_command</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span>
<span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span>
<span class="n">ignore_all_deps</span><span class="o">=</span><span class="n">ignore_all_deps</span><span class="p">,</span>
<span class="n">ignore_task_deps</span><span class="o">=</span><span class="n">ignore_task_deps</span><span class="p">,</span>
<span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="n">ignore_depends_on_past</span><span class="p">,</span>
<span class="n">ignore_ti_state</span><span class="o">=</span><span class="n">ignore_ti_state</span><span class="p">,</span>
<span class="n">local</span><span class="o">=</span><span class="n">local</span><span class="p">,</span>
<span class="n">pickle_id</span><span class="o">=</span><span class="n">pickle_id</span><span class="p">,</span>
<span class="n">file_path</span><span class="o">=</span><span class="n">path</span><span class="p">,</span>
<span class="n">raw</span><span class="o">=</span><span class="n">raw</span><span class="p">,</span>
<span class="n">job_id</span><span class="o">=</span><span class="n">job_id</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span>
<span class="n">cfg_path</span><span class="o">=</span><span class="n">cfg_path</span><span class="p">)</span></div>
<span class="nd">@staticmethod</span>
<div class="viewcode-block" id="TaskInstance.generate_command"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.generate_command">[docs]</a> <span class="k">def</span> <span class="nf">generate_command</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">task_id</span><span class="p">,</span>
<span class="n">execution_date</span><span class="p">,</span>
<span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_all_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_task_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_ti_state</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">local</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">pickle_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">file_path</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">raw</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">job_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">cfg_path</span><span class="o">=</span><span class="kc">None</span>
<span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Generates the shell command required to execute this task instance.</span>
<span class="sd"> :param dag_id: DAG ID</span>
<span class="sd"> :type dag_id: unicode</span>
<span class="sd"> :param task_id: Task ID</span>
<span class="sd"> :type task_id: unicode</span>
<span class="sd"> :param execution_date: Execution date for the task</span>
<span class="sd"> :type execution_date: datetime.datetime</span>
<span class="sd"> :param mark_success: Whether to mark the task as successful</span>
<span class="sd"> :type mark_success: bool</span>
<span class="sd"> :param ignore_all_deps: Ignore all ignorable dependencies.</span>
<span class="sd"> Overrides the other ignore_* parameters.</span>
<span class="sd"> :type ignore_all_deps: bool</span>
<span class="sd"> :param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs</span>
<span class="sd"> (e.g. for Backfills)</span>
<span class="sd"> :type ignore_depends_on_past: bool</span>
<span class="sd"> :param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past</span>
<span class="sd"> and trigger rule</span>
<span class="sd"> :type ignore_task_deps: bool</span>
<span class="sd"> :param ignore_ti_state: Ignore the task instance&#39;s previous failure/success</span>
<span class="sd"> :type ignore_ti_state: bool</span>
<span class="sd"> :param local: Whether to run the task locally</span>
<span class="sd"> :type local: bool</span>
<span class="sd"> :param pickle_id: If the DAG was serialized to the DB, the ID</span>
<span class="sd"> associated with the pickled DAG</span>
<span class="sd"> :type pickle_id: unicode</span>
<span class="sd"> :param file_path: path to the file containing the DAG definition</span>
<span class="sd"> :param raw: raw mode (needs more details)</span>
<span class="sd"> :param job_id: job ID (needs more details)</span>
<span class="sd"> :param pool: the Airflow pool that the task should run in</span>
<span class="sd"> :type pool: unicode</span>
<span class="sd"> :param cfg_path: the Path to the configuration file</span>
<span class="sd"> :type cfg_path: basestring</span>
<span class="sd"> :return: shell command that can be used to run the task instance</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">iso</span> <span class="o">=</span> <span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
<span class="n">cmd</span> <span class="o">=</span> <span class="p">[</span><span class="s2">&quot;airflow&quot;</span><span class="p">,</span> <span class="s2">&quot;run&quot;</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">dag_id</span><span class="p">),</span> <span class="nb">str</span><span class="p">(</span><span class="n">task_id</span><span class="p">),</span> <span class="nb">str</span><span class="p">(</span><span class="n">iso</span><span class="p">)]</span>
<span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">&quot;--mark_success&quot;</span><span class="p">])</span> <span class="k">if</span> <span class="n">mark_success</span> <span class="k">else</span> <span class="kc">None</span>
<span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">&quot;--pickle&quot;</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">pickle_id</span><span class="p">)])</span> <span class="k">if</span> <span class="n">pickle_id</span> <span class="k">else</span> <span class="kc">None</span>
<span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">&quot;--job_id&quot;</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">job_id</span><span class="p">)])</span> <span class="k">if</span> <span class="n">job_id</span> <span class="k">else</span> <span class="kc">None</span>
<span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">&quot;-A&quot;</span><span class="p">])</span> <span class="k">if</span> <span class="n">ignore_all_deps</span> <span class="k">else</span> <span class="kc">None</span>
<span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">&quot;-i&quot;</span><span class="p">])</span> <span class="k">if</span> <span class="n">ignore_task_deps</span> <span class="k">else</span> <span class="kc">None</span>
<span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">&quot;-I&quot;</span><span class="p">])</span> <span class="k">if</span> <span class="n">ignore_depends_on_past</span> <span class="k">else</span> <span class="kc">None</span>
<span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">&quot;--force&quot;</span><span class="p">])</span> <span class="k">if</span> <span class="n">ignore_ti_state</span> <span class="k">else</span> <span class="kc">None</span>
<span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">&quot;--local&quot;</span><span class="p">])</span> <span class="k">if</span> <span class="n">local</span> <span class="k">else</span> <span class="kc">None</span>
<span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">&quot;--pool&quot;</span><span class="p">,</span> <span class="n">pool</span><span class="p">])</span> <span class="k">if</span> <span class="n">pool</span> <span class="k">else</span> <span class="kc">None</span>
<span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">&quot;--raw&quot;</span><span class="p">])</span> <span class="k">if</span> <span class="n">raw</span> <span class="k">else</span> <span class="kc">None</span>
<span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">&quot;-sd&quot;</span><span class="p">,</span> <span class="n">file_path</span><span class="p">])</span> <span class="k">if</span> <span class="n">file_path</span> <span class="k">else</span> <span class="kc">None</span>
<span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">&quot;--cfg_path&quot;</span><span class="p">,</span> <span class="n">cfg_path</span><span class="p">])</span> <span class="k">if</span> <span class="n">cfg_path</span> <span class="k">else</span> <span class="kc">None</span>
<span class="k">return</span> <span class="n">cmd</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="TaskInstance.log_filepath"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.log_filepath">[docs]</a> <span class="k">def</span> <span class="nf">log_filepath</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">iso</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
<span class="n">log</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">expanduser</span><span class="p">(</span><span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;core&#39;</span><span class="p">,</span> <span class="s1">&#39;BASE_LOG_FOLDER&#39;</span><span class="p">))</span>
<span class="k">return</span> <span class="p">(</span><span class="s2">&quot;</span><span class="si">{log}</span><span class="s2">/</span><span class="si">{dag_id}</span><span class="s2">/</span><span class="si">{task_id}</span><span class="s2">/</span><span class="si">{iso}</span><span class="s2">.log&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">log</span><span class="o">=</span><span class="n">log</span><span class="p">,</span> <span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">iso</span><span class="o">=</span><span class="n">iso</span><span class="p">))</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="TaskInstance.log_url"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.log_url">[docs]</a> <span class="k">def</span> <span class="nf">log_url</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">iso</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
<span class="n">base_url</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;webserver&#39;</span><span class="p">,</span> <span class="s1">&#39;BASE_URL&#39;</span><span class="p">)</span>
<span class="n">relative_url</span> <span class="o">=</span> <span class="s1">&#39;/log?execution_date=</span><span class="si">{iso}</span><span class="s1">&amp;task_id=</span><span class="si">{task_id}</span><span class="s1">&amp;dag_id=</span><span class="si">{dag_id}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">iso</span><span class="o">=</span><span class="n">quote_plus</span><span class="p">(</span><span class="n">iso</span><span class="p">),</span> <span class="n">task_id</span><span class="o">=</span><span class="n">quote_plus</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">),</span> <span class="n">dag_id</span><span class="o">=</span><span class="n">quote_plus</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">))</span>
<span class="k">if</span> <span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">&#39;webserver&#39;</span><span class="p">,</span> <span class="s1">&#39;rbac&#39;</span><span class="p">):</span>
<span class="k">return</span> <span class="s1">&#39;</span><span class="si">{base_url}{relative_url}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">base_url</span><span class="o">=</span><span class="n">base_url</span><span class="p">,</span> <span class="n">relative_url</span><span class="o">=</span><span class="n">relative_url</span><span class="p">)</span>
<span class="k">return</span> <span class="s1">&#39;</span><span class="si">{base_url}</span><span class="s1">/admin/airflow</span><span class="si">{relative_url}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">base_url</span><span class="o">=</span><span class="n">base_url</span><span class="p">,</span> <span class="n">relative_url</span><span class="o">=</span><span class="n">relative_url</span><span class="p">)</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="TaskInstance.mark_success_url"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.mark_success_url">[docs]</a> <span class="k">def</span> <span class="nf">mark_success_url</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">iso</span> <span class="o">=</span> <span class="n">quote</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">())</span>
<span class="n">base_url</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;webserver&#39;</span><span class="p">,</span> <span class="s1">&#39;BASE_URL&#39;</span><span class="p">)</span>
<span class="k">return</span> <span class="n">base_url</span> <span class="o">+</span> <span class="p">(</span>
<span class="s2">&quot;/success&quot;</span>
<span class="s2">&quot;?task_id=</span><span class="si">{task_id}</span><span class="s2">&quot;</span>
<span class="s2">&quot;&amp;dag_id=</span><span class="si">{dag_id}</span><span class="s2">&quot;</span>
<span class="s2">&quot;&amp;execution_date=</span><span class="si">{iso}</span><span class="s2">&quot;</span>
<span class="s2">&quot;&amp;upstream=false&quot;</span>
<span class="s2">&quot;&amp;downstream=false&quot;</span>
<span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">iso</span><span class="o">=</span><span class="n">iso</span><span class="p">)</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.current_state"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.current_state">[docs]</a> <span class="k">def</span> <span class="nf">current_state</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Get the very latest state from the database, if a session is passed,</span>
<span class="sd"> we use and looking up the state becomes part of the session, otherwise</span>
<span class="sd"> a new session is used.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span>
<span class="n">ti</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">TI</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">TI</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span>
<span class="n">TI</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span>
<span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">()</span>
<span class="k">if</span> <span class="n">ti</span><span class="p">:</span>
<span class="n">state</span> <span class="o">=</span> <span class="n">ti</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">state</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">state</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">return</span> <span class="n">state</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.error"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.error">[docs]</a> <span class="k">def</span> <span class="nf">error</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Forces the task instance&#39;s state to FAILED in the database.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">&quot;Recording the task instance as FAILED&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">FAILED</span>
<span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.refresh_from_db"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.refresh_from_db">[docs]</a> <span class="k">def</span> <span class="nf">refresh_from_db</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">lock_for_update</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Refreshes the task instance from the database based on the primary key</span>
<span class="sd"> :param lock_for_update: if True, indicates that the database should</span>
<span class="sd"> lock the TaskInstance (issuing a FOR UPDATE clause) until the</span>
<span class="sd"> session is committed.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span>
<span class="n">qry</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">TI</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">TI</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span>
<span class="n">TI</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>
<span class="k">if</span> <span class="n">lock_for_update</span><span class="p">:</span>
<span class="n">ti</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">with_for_update</span><span class="p">()</span><span class="o">.</span><span class="n">first</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">ti</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">first</span><span class="p">()</span>
<span class="k">if</span> <span class="n">ti</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">state</span>
<span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">start_date</span>
<span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">end_date</span>
<span class="c1"># Get the raw value of try_number column, don&#39;t read through the</span>
<span class="c1"># accessor here otherwise it will be incremeneted by one already.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">_try_number</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span>
<span class="bp">self</span><span class="o">.</span><span class="n">hostname</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">hostname</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pid</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">pid</span>
<span class="bp">self</span><span class="o">.</span><span class="n">executor_config</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">executor_config</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="kc">None</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.clear_xcom_data"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.clear_xcom_data">[docs]</a> <span class="k">def</span> <span class="nf">clear_xcom_data</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Clears all XCom data from the database for the task instance</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">XCom</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">XCom</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">XCom</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span>
<span class="n">XCom</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span>
<span class="p">)</span><span class="o">.</span><span class="n">delete</span><span class="p">()</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="TaskInstance.key"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.key">[docs]</a> <span class="k">def</span> <span class="nf">key</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns a tuple that identifies the task instance uniquely</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.set_state"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.set_state">[docs]</a> <span class="k">def</span> <span class="nf">set_state</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">commit</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">state</span>
<span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="k">if</span> <span class="n">commit</span><span class="p">:</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="TaskInstance.is_premature"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.is_premature">[docs]</a> <span class="k">def</span> <span class="nf">is_premature</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns whether a task is in UP_FOR_RETRY state and its retry interval</span>
<span class="sd"> has elapsed.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># is the task still in the retry waiting period?</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">ready_for_retry</span><span class="p">()</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.are_dependents_done"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.are_dependents_done">[docs]</a> <span class="k">def</span> <span class="nf">are_dependents_done</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Checks whether the dependents of this task instance have all succeeded.</span>
<span class="sd"> This is meant to be used by wait_for_downstream.</span>
<span class="sd"> This is useful when you do not want to start processing the next</span>
<span class="sd"> schedule of a task until the dependents are done. For instance,</span>
<span class="sd"> if the task DROPs and recreates a table.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">task</span><span class="o">.</span><span class="n">downstream_task_ids</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">True</span>
<span class="n">ti</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">func</span><span class="o">.</span><span class="n">count</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">task_id</span><span class="p">))</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">task_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">downstream_task_ids</span><span class="p">),</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">,</span>
<span class="p">)</span>
<span class="n">count</span> <span class="o">=</span> <span class="n">ti</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span>
<span class="k">return</span> <span class="n">count</span> <span class="o">==</span> <span class="nb">len</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">downstream_task_ids</span><span class="p">)</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance._get_previous_ti"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance._get_previous_ti">[docs]</a> <span class="k">def</span> <span class="nf">_get_previous_ti</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="c1"># type: (Optional[str], Session) -&gt; Optional[&#39;TaskInstance&#39;]</span>
<span class="n">dag</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span>
<span class="k">if</span> <span class="n">dag</span><span class="p">:</span>
<span class="n">dr</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_dagrun</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span>
<span class="c1"># LEGACY: most likely running from unit tests</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">dr</span><span class="p">:</span>
<span class="c1"># Means that this TI is NOT being run from a DR, but from a catchup</span>
<span class="n">previous_scheduled_date</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">previous_schedule</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">previous_scheduled_date</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">None</span>
<span class="k">return</span> <span class="n">TaskInstance</span><span class="p">(</span><span class="n">task</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="p">,</span> <span class="n">execution_date</span><span class="o">=</span><span class="n">previous_scheduled_date</span><span class="p">)</span>
<span class="n">dr</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">dag</span>
<span class="c1"># We always ignore schedule in dagrun lookup when `state` is given or `schedule_interval is None`.</span>
<span class="c1"># For legacy reasons, when `catchup=True`, we use `get_previous_scheduled_dagrun` unless</span>
<span class="c1"># `ignore_schedule` is `True`.</span>
<span class="n">ignore_schedule</span> <span class="o">=</span> <span class="n">state</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">or</span> <span class="n">dag</span><span class="o">.</span><span class="n">schedule_interval</span> <span class="ow">is</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">dag</span><span class="o">.</span><span class="n">catchup</span> <span class="ow">is</span> <span class="kc">True</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">ignore_schedule</span><span class="p">:</span>
<span class="n">last_dagrun</span> <span class="o">=</span> <span class="n">dr</span><span class="o">.</span><span class="n">get_previous_scheduled_dagrun</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">last_dagrun</span> <span class="o">=</span> <span class="n">dr</span><span class="o">.</span><span class="n">get_previous_dagrun</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">,</span> <span class="n">state</span><span class="o">=</span><span class="n">state</span><span class="p">)</span>
<span class="k">if</span> <span class="n">last_dagrun</span><span class="p">:</span>
<span class="k">return</span> <span class="n">last_dagrun</span><span class="o">.</span><span class="n">get_task_instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">None</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="TaskInstance.previous_ti"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.previous_ti">[docs]</a> <span class="k">def</span> <span class="nf">previous_ti</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="c1"># type: () -&gt; Optional[&#39;TaskInstance&#39;]</span>
<span class="sd">&quot;&quot;&quot;The task instance for the task that ran before this task instance.&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_previous_ti</span><span class="p">()</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="TaskInstance.previous_ti_success"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.previous_ti_success">[docs]</a> <span class="k">def</span> <span class="nf">previous_ti_success</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="c1"># type: () -&gt; Optional[&#39;TaskInstance&#39;]</span>
<span class="sd">&quot;&quot;&quot;The ti from prior succesful dag run for this task, by execution date.&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_previous_ti</span><span class="p">(</span><span class="n">state</span><span class="o">=</span><span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">)</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="TaskInstance.previous_execution_date_success"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.previous_execution_date_success">[docs]</a> <span class="k">def</span> <span class="nf">previous_execution_date_success</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="c1"># type: () -&gt; Optional[pendulum.datetime]</span>
<span class="sd">&quot;&quot;&quot;The execution date from property previous_ti_success.&quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;previous_execution_date_success was called&quot;</span><span class="p">)</span>
<span class="n">prev_ti</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_previous_ti</span><span class="p">(</span><span class="n">state</span><span class="o">=</span><span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">)</span>
<span class="k">return</span> <span class="n">prev_ti</span> <span class="ow">and</span> <span class="n">prev_ti</span><span class="o">.</span><span class="n">execution_date</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="TaskInstance.previous_start_date_success"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.previous_start_date_success">[docs]</a> <span class="k">def</span> <span class="nf">previous_start_date_success</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="c1"># type: () -&gt; Optional[pendulum.datetime]</span>
<span class="sd">&quot;&quot;&quot;The start date from property previous_ti_success.&quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;previous_start_date_success was called&quot;</span><span class="p">)</span>
<span class="n">prev_ti</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_previous_ti</span><span class="p">(</span><span class="n">state</span><span class="o">=</span><span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">)</span>
<span class="k">return</span> <span class="n">prev_ti</span> <span class="ow">and</span> <span class="n">prev_ti</span><span class="o">.</span><span class="n">start_date</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.are_dependencies_met"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.are_dependencies_met">[docs]</a> <span class="k">def</span> <span class="nf">are_dependencies_met</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">dep_context</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">verbose</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns whether or not all the conditions are met for this task instance to be run</span>
<span class="sd"> given the context for the dependencies (e.g. a task instance being force run from</span>
<span class="sd"> the UI will ignore some dependencies).</span>
<span class="sd"> :param dep_context: The execution context that determines the dependencies that</span>
<span class="sd"> should be evaluated.</span>
<span class="sd"> :type dep_context: DepContext</span>
<span class="sd"> :param session: database session</span>
<span class="sd"> :type session: sqlalchemy.orm.session.Session</span>
<span class="sd"> :param verbose: whether log details on failed dependencies on</span>
<span class="sd"> info or debug log level</span>
<span class="sd"> :type verbose: bool</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">dep_context</span> <span class="o">=</span> <span class="n">dep_context</span> <span class="ow">or</span> <span class="n">DepContext</span><span class="p">()</span>
<span class="n">failed</span> <span class="o">=</span> <span class="kc">False</span>
<span class="n">verbose_aware_logger</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span> <span class="k">if</span> <span class="n">verbose</span> <span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span>
<span class="k">for</span> <span class="n">dep_status</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_failed_dep_statuses</span><span class="p">(</span>
<span class="n">dep_context</span><span class="o">=</span><span class="n">dep_context</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">):</span>
<span class="n">failed</span> <span class="o">=</span> <span class="kc">True</span>
<span class="n">verbose_aware_logger</span><span class="p">(</span>
<span class="s2">&quot;Dependencies not met for </span><span class="si">%s</span><span class="s2">, dependency &#39;</span><span class="si">%s</span><span class="s2">&#39; FAILED: </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">dep_status</span><span class="o">.</span><span class="n">dep_name</span><span class="p">,</span> <span class="n">dep_status</span><span class="o">.</span><span class="n">reason</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">failed</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="n">verbose_aware_logger</span><span class="p">(</span><span class="s2">&quot;Dependencies all met for </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">True</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.get_failed_dep_statuses"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.get_failed_dep_statuses">[docs]</a> <span class="k">def</span> <span class="nf">get_failed_dep_statuses</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">dep_context</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">dep_context</span> <span class="o">=</span> <span class="n">dep_context</span> <span class="ow">or</span> <span class="n">DepContext</span><span class="p">()</span>
<span class="k">for</span> <span class="n">dep</span> <span class="ow">in</span> <span class="n">dep_context</span><span class="o">.</span><span class="n">deps</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">deps</span><span class="p">:</span>
<span class="k">for</span> <span class="n">dep_status</span> <span class="ow">in</span> <span class="n">dep</span><span class="o">.</span><span class="n">get_dep_statuses</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">session</span><span class="p">,</span>
<span class="n">dep_context</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span>
<span class="s2">&quot;</span><span class="si">%s</span><span class="s2"> dependency &#39;</span><span class="si">%s</span><span class="s2">&#39; PASSED: </span><span class="si">%s</span><span class="s2">, </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span>
<span class="bp">self</span><span class="p">,</span> <span class="n">dep_status</span><span class="o">.</span><span class="n">dep_name</span><span class="p">,</span> <span class="n">dep_status</span><span class="o">.</span><span class="n">passed</span><span class="p">,</span> <span class="n">dep_status</span><span class="o">.</span><span class="n">reason</span>
<span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">dep_status</span><span class="o">.</span><span class="n">passed</span><span class="p">:</span>
<span class="k">yield</span> <span class="n">dep_status</span></div>
<div class="viewcode-block" id="TaskInstance.__repr__"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.__repr__">[docs]</a> <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span>
<span class="s2">&quot;&lt;TaskInstance: </span><span class="si">{ti.dag_id}</span><span class="s2">.</span><span class="si">{ti.task_id}</span><span class="s2"> &quot;</span>
<span class="s2">&quot;</span><span class="si">{ti.execution_date}</span><span class="s2"> [</span><span class="si">{ti.state}</span><span class="s2">]&gt;&quot;</span>
<span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ti</span><span class="o">=</span><span class="bp">self</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.next_retry_datetime"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.next_retry_datetime">[docs]</a> <span class="k">def</span> <span class="nf">next_retry_datetime</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Get datetime of the next retry if the task instance fails. For exponential</span>
<span class="sd"> backoff, retry_delay is used as base and will be converted to seconds.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">delay</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retry_delay</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retry_exponential_backoff</span><span class="p">:</span>
<span class="c1"># If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus,</span>
<span class="c1"># we must round up prior to converting to an int, otherwise a divide by zero error</span>
<span class="c1"># will occurr in the modded_hash calculation.</span>
<span class="n">min_backoff</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">math</span><span class="o">.</span><span class="n">ceil</span><span class="p">(</span><span class="n">delay</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span> <span class="o">*</span> <span class="p">(</span><span class="mi">2</span> <span class="o">**</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">-</span> <span class="mi">2</span><span class="p">))))</span>
<span class="c1"># deterministic per task instance</span>
<span class="nb">hash</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">hashlib</span><span class="o">.</span><span class="n">sha1</span><span class="p">(</span><span class="s2">&quot;</span><span class="si">{}</span><span class="s2">#</span><span class="si">{}</span><span class="s2">#</span><span class="si">{}</span><span class="s2">#</span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">try_number</span><span class="p">)</span>
<span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">&#39;utf-8&#39;</span><span class="p">))</span><span class="o">.</span><span class="n">hexdigest</span><span class="p">(),</span> <span class="mi">16</span><span class="p">)</span>
<span class="c1"># between 1 and 1.0 * delay * (2^retry_number)</span>
<span class="n">modded_hash</span> <span class="o">=</span> <span class="n">min_backoff</span> <span class="o">+</span> <span class="nb">hash</span> <span class="o">%</span> <span class="n">min_backoff</span>
<span class="c1"># timedelta has a maximum representable value. The exponentiation</span>
<span class="c1"># here means this value can be exceeded after a certain number</span>
<span class="c1"># of tries (around 50 if the initial delay is 1s, even fewer if</span>
<span class="c1"># the delay is larger). Cap the value here before creating a</span>
<span class="c1"># timedelta object so the operation doesn&#39;t fail.</span>
<span class="n">delay_backoff_in_seconds</span> <span class="o">=</span> <span class="nb">min</span><span class="p">(</span>
<span class="n">modded_hash</span><span class="p">,</span>
<span class="n">timedelta</span><span class="o">.</span><span class="n">max</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span> <span class="o">-</span> <span class="mi">1</span>
<span class="p">)</span>
<span class="n">delay</span> <span class="o">=</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">delay_backoff_in_seconds</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">max_retry_delay</span><span class="p">:</span>
<span class="n">delay</span> <span class="o">=</span> <span class="nb">min</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">max_retry_delay</span><span class="p">,</span> <span class="n">delay</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">+</span> <span class="n">delay</span></div>
<div class="viewcode-block" id="TaskInstance.ready_for_retry"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.ready_for_retry">[docs]</a> <span class="k">def</span> <span class="nf">ready_for_retry</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Checks on whether the task instance is in the right state and timeframe</span>
<span class="sd"> to be retried.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> <span class="ow">and</span>
<span class="bp">self</span><span class="o">.</span><span class="n">next_retry_datetime</span><span class="p">()</span> <span class="o">&lt;</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">())</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.pool_full"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.pool_full">[docs]</a> <span class="k">def</span> <span class="nf">pool_full</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns a boolean as to whether the slot pool has room for this</span>
<span class="sd"> task to run</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">pool</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="n">pool</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">session</span>
<span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">Pool</span><span class="p">)</span>
<span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">Pool</span><span class="o">.</span><span class="n">pool</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">pool</span><span class="p">)</span>
<span class="o">.</span><span class="n">first</span><span class="p">()</span>
<span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">pool</span><span class="p">:</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="n">open_slots</span> <span class="o">=</span> <span class="n">pool</span><span class="o">.</span><span class="n">open_slots</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span>
<span class="k">return</span> <span class="n">open_slots</span> <span class="o">&lt;=</span> <span class="mi">0</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.get_dagrun"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.get_dagrun">[docs]</a> <span class="k">def</span> <span class="nf">get_dagrun</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns the DagRun for this TaskInstance</span>
<span class="sd"> :param session:</span>
<span class="sd"> :return: DagRun</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="kn">from</span> <span class="nn">airflow.models.dagrun</span> <span class="k">import</span> <span class="n">DagRun</span> <span class="c1"># Avoid circular import</span>
<span class="n">dr</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagRun</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">DagRun</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">DagRun</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span>
<span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span>
<span class="k">return</span> <span class="n">dr</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance._check_and_change_state_before_execution"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance._check_and_change_state_before_execution">[docs]</a> <span class="k">def</span> <span class="nf">_check_and_change_state_before_execution</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">verbose</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">ignore_all_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_task_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_ti_state</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">test_mode</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">job_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Checks dependencies and then sets state to RUNNING if they are met. Returns</span>
<span class="sd"> True if and only if state is set to RUNNING, which implies that task should be</span>
<span class="sd"> executed, in preparation for _run_raw_task</span>
<span class="sd"> :param verbose: whether to turn on more verbose logging</span>
<span class="sd"> :type verbose: bool</span>
<span class="sd"> :param ignore_all_deps: Ignore all of the non-critical dependencies, just runs</span>
<span class="sd"> :type ignore_all_deps: bool</span>
<span class="sd"> :param ignore_depends_on_past: Ignore depends_on_past DAG attribute</span>
<span class="sd"> :type ignore_depends_on_past: bool</span>
<span class="sd"> :param ignore_task_deps: Don&#39;t check the dependencies of this TI&#39;s task</span>
<span class="sd"> :type ignore_task_deps: bool</span>
<span class="sd"> :param ignore_ti_state: Disregards previous task instance state</span>
<span class="sd"> :type ignore_ti_state: bool</span>
<span class="sd"> :param mark_success: Don&#39;t run the task, mark its state as success</span>
<span class="sd"> :type mark_success: bool</span>
<span class="sd"> :param test_mode: Doesn&#39;t record success or failure in the DB</span>
<span class="sd"> :type test_mode: bool</span>
<span class="sd"> :param pool: specifies the pool to use to run the task instance</span>
<span class="sd"> :type pool: str</span>
<span class="sd"> :return: whether the state was changed to running or not</span>
<span class="sd"> :rtype: bool</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pool</span> <span class="o">=</span> <span class="n">pool</span> <span class="ow">or</span> <span class="n">task</span><span class="o">.</span><span class="n">pool</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_mode</span> <span class="o">=</span> <span class="n">test_mode</span>
<span class="bp">self</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">,</span> <span class="n">lock_for_update</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">job_id</span> <span class="o">=</span> <span class="n">job_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">hostname</span> <span class="o">=</span> <span class="n">get_hostname</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">operator</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">ignore_all_deps</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">ignore_ti_state</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">:</span>
<span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">&#39;previously_succeeded&#39;</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span>
<span class="n">queue_dep_context</span> <span class="o">=</span> <span class="n">DepContext</span><span class="p">(</span>
<span class="n">deps</span><span class="o">=</span><span class="n">QUEUE_DEPS</span><span class="p">,</span>
<span class="n">ignore_all_deps</span><span class="o">=</span><span class="n">ignore_all_deps</span><span class="p">,</span>
<span class="n">ignore_ti_state</span><span class="o">=</span><span class="n">ignore_ti_state</span><span class="p">,</span>
<span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="n">ignore_depends_on_past</span><span class="p">,</span>
<span class="n">ignore_task_deps</span><span class="o">=</span><span class="n">ignore_task_deps</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">are_dependencies_met</span><span class="p">(</span>
<span class="n">dep_context</span><span class="o">=</span><span class="n">queue_dep_context</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">,</span>
<span class="n">verbose</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="c1"># TODO: Logging needs cleanup, not clear what is being printed</span>
<span class="n">hr</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="se">\n</span><span class="s2">&quot;</span> <span class="o">+</span> <span class="p">(</span><span class="s2">&quot;-&quot;</span> <span class="o">*</span> <span class="mi">80</span><span class="p">)</span> <span class="c1"># Line break</span>
<span class="c1"># For reporting purposes, we report based on 1-indexed,</span>
<span class="c1"># not 0-indexed lists (i.e. Attempt 1 instead of</span>
<span class="c1"># Attempt 0 for the first attempt).</span>
<span class="c1"># Set the task start date. In case it was re-scheduled use the initial</span>
<span class="c1"># start date that is recorded in task_reschedule table</span>
<span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="n">task_reschedules</span> <span class="o">=</span> <span class="n">TaskReschedule</span><span class="o">.</span><span class="n">find_for_task_instance</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="p">)</span>
<span class="k">if</span> <span class="n">task_reschedules</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">task_reschedules</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">start_date</span>
<span class="n">dep_context</span> <span class="o">=</span> <span class="n">DepContext</span><span class="p">(</span>
<span class="n">deps</span><span class="o">=</span><span class="n">RUN_DEPS</span> <span class="o">-</span> <span class="n">QUEUE_DEPS</span><span class="p">,</span>
<span class="n">ignore_all_deps</span><span class="o">=</span><span class="n">ignore_all_deps</span><span class="p">,</span>
<span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="n">ignore_depends_on_past</span><span class="p">,</span>
<span class="n">ignore_task_deps</span><span class="o">=</span><span class="n">ignore_task_deps</span><span class="p">,</span>
<span class="n">ignore_ti_state</span><span class="o">=</span><span class="n">ignore_ti_state</span><span class="p">)</span>
<span class="n">runnable</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">are_dependencies_met</span><span class="p">(</span>
<span class="n">dep_context</span><span class="o">=</span><span class="n">dep_context</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">,</span>
<span class="n">verbose</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">runnable</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">mark_success</span><span class="p">:</span>
<span class="c1"># FIXME: we might have hit concurrency limits, which means we probably</span>
<span class="c1"># have been running prematurely. This should be handled in the</span>
<span class="c1"># scheduling mechanism.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">NONE</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="n">hr</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s2">&quot;FIXME: Rescheduling due to concurrency limits reached at task runtime. Attempt </span><span class="si">%s</span><span class="s2"> of &quot;</span>
<span class="s2">&quot;</span><span class="si">%s</span><span class="s2">. State set to NONE.&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">+</span> <span class="mi">1</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="n">hr</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">queued_dttm</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Queuing into pool </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">pool</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="c1"># Another worker might have started running this task instance while</span>
<span class="c1"># the current worker process was blocked on refresh_from_db</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">&quot;Task Instance already running </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="k">return</span> <span class="kc">False</span>
<span class="c1"># print status message</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="n">hr</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Starting attempt </span><span class="si">%s</span><span class="s2"> of </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">+</span> <span class="mi">1</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="n">hr</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">+=</span> <span class="mi">1</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">test_mode</span><span class="p">:</span>
<span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">Log</span><span class="p">(</span><span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">,</span> <span class="bp">self</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pid</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">getpid</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">test_mode</span><span class="p">:</span>
<span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="c1"># Closing all pooled connections to prevent</span>
<span class="c1"># &quot;max number of connections reached&quot;</span>
<span class="n">settings</span><span class="o">.</span><span class="n">engine</span><span class="o">.</span><span class="n">dispose</span><span class="p">()</span>
<span class="k">if</span> <span class="n">verbose</span><span class="p">:</span>
<span class="k">if</span> <span class="n">mark_success</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Marking success for </span><span class="si">%s</span><span class="s2"> on </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">&quot;Executing </span><span class="si">%s</span><span class="s2"> on </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">True</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance._run_raw_task"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance._run_raw_task">[docs]</a> <span class="k">def</span> <span class="nf">_run_raw_task</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">test_mode</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">job_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Immediately runs the task (without checking or changing db state</span>
<span class="sd"> before execution) and then sets the appropriate final state after</span>
<span class="sd"> completion and runs any post-execute callbacks. Meant to be called</span>
<span class="sd"> only after another function changes the state to running.</span>
<span class="sd"> :param mark_success: Don&#39;t run the task, mark its state as success</span>
<span class="sd"> :type mark_success: bool</span>
<span class="sd"> :param test_mode: Doesn&#39;t record success or failure in the DB</span>
<span class="sd"> :type test_mode: bool</span>
<span class="sd"> :param pool: specifies the pool to use to run the task instance</span>
<span class="sd"> :type pool: str</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pool</span> <span class="o">=</span> <span class="n">pool</span> <span class="ow">or</span> <span class="n">task</span><span class="o">.</span><span class="n">pool</span>
<span class="bp">self</span><span class="o">.</span><span class="n">test_mode</span> <span class="o">=</span> <span class="n">test_mode</span>
<span class="bp">self</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">job_id</span> <span class="o">=</span> <span class="n">job_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">hostname</span> <span class="o">=</span> <span class="n">get_hostname</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">operator</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span>
<span class="n">context</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">actual_start_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">mark_success</span><span class="p">:</span>
<span class="n">context</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_template_context</span><span class="p">()</span>
<span class="n">task_copy</span> <span class="o">=</span> <span class="n">copy</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">task</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task</span> <span class="o">=</span> <span class="n">task_copy</span>
<span class="k">def</span> <span class="nf">signal_handler</span><span class="p">(</span><span class="n">signum</span><span class="p">,</span> <span class="n">frame</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">&quot;Received SIGTERM. Terminating subprocesses.&quot;</span><span class="p">)</span>
<span class="n">task_copy</span><span class="o">.</span><span class="n">on_kill</span><span class="p">()</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">&quot;Task received SIGTERM signal&quot;</span><span class="p">)</span>
<span class="n">signal</span><span class="o">.</span><span class="n">signal</span><span class="p">(</span><span class="n">signal</span><span class="o">.</span><span class="n">SIGTERM</span><span class="p">,</span> <span class="n">signal_handler</span><span class="p">)</span>
<span class="c1"># Don&#39;t clear Xcom until the task is certain to execute</span>
<span class="bp">self</span><span class="o">.</span><span class="n">clear_xcom_data</span><span class="p">()</span>
<span class="n">start_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">render_templates</span><span class="p">(</span><span class="n">context</span><span class="o">=</span><span class="n">context</span><span class="p">)</span>
<span class="n">task_copy</span><span class="o">.</span><span class="n">pre_execute</span><span class="p">(</span><span class="n">context</span><span class="o">=</span><span class="n">context</span><span class="p">)</span>
<span class="c1"># If a timeout is specified for the task, make it fail</span>
<span class="c1"># if it goes beyond</span>
<span class="n">result</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">task_copy</span><span class="o">.</span><span class="n">execution_timeout</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">with</span> <span class="n">timeout</span><span class="p">(</span><span class="nb">int</span><span class="p">(</span>
<span class="n">task_copy</span><span class="o">.</span><span class="n">execution_timeout</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">())):</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">task_copy</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="o">=</span><span class="n">context</span><span class="p">)</span>
<span class="k">except</span> <span class="n">AirflowTaskTimeout</span><span class="p">:</span>
<span class="n">task_copy</span><span class="o">.</span><span class="n">on_kill</span><span class="p">()</span>
<span class="k">raise</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">task_copy</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="o">=</span><span class="n">context</span><span class="p">)</span>
<span class="c1"># If the task returns a result, push an XCom containing it</span>
<span class="k">if</span> <span class="n">task_copy</span><span class="o">.</span><span class="n">do_xcom_push</span> <span class="ow">and</span> <span class="n">result</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="n">XCOM_RETURN_KEY</span><span class="p">,</span> <span class="n">value</span><span class="o">=</span><span class="n">result</span><span class="p">)</span>
<span class="n">task_copy</span><span class="o">.</span><span class="n">post_execute</span><span class="p">(</span><span class="n">context</span><span class="o">=</span><span class="n">context</span><span class="p">,</span> <span class="n">result</span><span class="o">=</span><span class="n">result</span><span class="p">)</span>
<span class="n">end_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span>
<span class="n">duration</span> <span class="o">=</span> <span class="n">end_time</span> <span class="o">-</span> <span class="n">start_time</span>
<span class="n">Stats</span><span class="o">.</span><span class="n">timing</span><span class="p">(</span>
<span class="s1">&#39;dag.</span><span class="si">{dag_id}</span><span class="s1">.</span><span class="si">{task_id}</span><span class="s1">.duration&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">dag_id</span><span class="o">=</span><span class="n">task_copy</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">task_id</span><span class="o">=</span><span class="n">task_copy</span><span class="o">.</span><span class="n">task_id</span><span class="p">),</span>
<span class="n">duration</span><span class="p">)</span>
<span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">&#39;operator_successes_</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">),</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span>
<span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">&#39;ti_successes&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">(</span><span class="n">lock_for_update</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span>
<span class="k">except</span> <span class="n">AirflowSkipException</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">(</span><span class="n">lock_for_update</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SKIPPED</span>
<span class="k">except</span> <span class="n">AirflowRescheduleException</span> <span class="k">as</span> <span class="n">reschedule_exception</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_handle_reschedule</span><span class="p">(</span><span class="n">actual_start_date</span><span class="p">,</span> <span class="n">reschedule_exception</span><span class="p">,</span> <span class="n">test_mode</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span>
<span class="k">return</span>
<span class="k">except</span> <span class="n">AirflowException</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">()</span>
<span class="c1"># for case when task is marked as success/failed externally</span>
<span class="c1"># current behavior doesn&#39;t hit the success callback</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="ow">in</span> <span class="p">{</span><span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">,</span> <span class="n">State</span><span class="o">.</span><span class="n">FAILED</span><span class="p">}:</span>
<span class="k">return</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">handle_failure</span><span class="p">(</span><span class="n">e</span><span class="p">,</span> <span class="n">test_mode</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span>
<span class="k">raise</span>
<span class="k">except</span> <span class="p">(</span><span class="ne">Exception</span><span class="p">,</span> <span class="ne">KeyboardInterrupt</span><span class="p">)</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">handle_failure</span><span class="p">(</span><span class="n">e</span><span class="p">,</span> <span class="n">test_mode</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span>
<span class="k">raise</span>
<span class="c1"># Success callback</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">on_success_callback</span><span class="p">:</span>
<span class="n">task</span><span class="o">.</span><span class="n">on_success_callback</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e3</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">&quot;Failed when executing success callback&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e3</span><span class="p">)</span>
<span class="c1"># Recording SUCCESS</span>
<span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">set_duration</span><span class="p">()</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">test_mode</span><span class="p">:</span>
<span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">Log</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">,</span> <span class="bp">self</span><span class="p">))</span>
<span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.run"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.run">[docs]</a> <span class="k">def</span> <span class="nf">run</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">verbose</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span>
<span class="n">ignore_all_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_task_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_ti_state</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">test_mode</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">job_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">res</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_check_and_change_state_before_execution</span><span class="p">(</span>
<span class="n">verbose</span><span class="o">=</span><span class="n">verbose</span><span class="p">,</span>
<span class="n">ignore_all_deps</span><span class="o">=</span><span class="n">ignore_all_deps</span><span class="p">,</span>
<span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="n">ignore_depends_on_past</span><span class="p">,</span>
<span class="n">ignore_task_deps</span><span class="o">=</span><span class="n">ignore_task_deps</span><span class="p">,</span>
<span class="n">ignore_ti_state</span><span class="o">=</span><span class="n">ignore_ti_state</span><span class="p">,</span>
<span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span>
<span class="n">test_mode</span><span class="o">=</span><span class="n">test_mode</span><span class="p">,</span>
<span class="n">job_id</span><span class="o">=</span><span class="n">job_id</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span>
<span class="k">if</span> <span class="n">res</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_run_raw_task</span><span class="p">(</span>
<span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span>
<span class="n">test_mode</span><span class="o">=</span><span class="n">test_mode</span><span class="p">,</span>
<span class="n">job_id</span><span class="o">=</span><span class="n">job_id</span><span class="p">,</span>
<span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.dry_run"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.dry_run">[docs]</a> <span class="k">def</span> <span class="nf">dry_run</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span>
<span class="n">task_copy</span> <span class="o">=</span> <span class="n">copy</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">task</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task</span> <span class="o">=</span> <span class="n">task_copy</span>
<span class="bp">self</span><span class="o">.</span><span class="n">render_templates</span><span class="p">()</span>
<span class="n">task_copy</span><span class="o">.</span><span class="n">dry_run</span><span class="p">()</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance._handle_reschedule"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance._handle_reschedule">[docs]</a> <span class="k">def</span> <span class="nf">_handle_reschedule</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">actual_start_date</span><span class="p">,</span> <span class="n">reschedule_exception</span><span class="p">,</span> <span class="n">test_mode</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">context</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="c1"># Don&#39;t record reschedule request in test mode</span>
<span class="k">if</span> <span class="n">test_mode</span><span class="p">:</span>
<span class="k">return</span>
<span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">set_duration</span><span class="p">()</span>
<span class="c1"># Log reschedule request</span>
<span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">TaskReschedule</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span><span class="p">,</span>
<span class="n">actual_start_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span><span class="p">,</span>
<span class="n">reschedule_exception</span><span class="o">.</span><span class="n">reschedule_date</span><span class="p">))</span>
<span class="c1"># set state</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RESCHEDULE</span>
<span class="c1"># Decrement try_number so subsequent runs will use the same try number and write</span>
<span class="c1"># to same log file.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">-=</span> <span class="mi">1</span>
<span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Rescheduling task, marking task as UP_FOR_RESCHEDULE&#39;</span><span class="p">)</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.handle_failure"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.handle_failure">[docs]</a> <span class="k">def</span> <span class="nf">handle_failure</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">error</span><span class="p">,</span> <span class="n">test_mode</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">context</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">error</span><span class="p">)</span>
<span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span>
<span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">set_duration</span><span class="p">()</span>
<span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">&#39;operator_failures_</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">),</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span>
<span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">&#39;ti_failures&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">test_mode</span><span class="p">:</span>
<span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">Log</span><span class="p">(</span><span class="n">State</span><span class="o">.</span><span class="n">FAILED</span><span class="p">,</span> <span class="bp">self</span><span class="p">))</span>
<span class="c1"># Log failure duration</span>
<span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">TaskFail</span><span class="p">(</span><span class="n">task</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span><span class="p">))</span>
<span class="k">if</span> <span class="n">context</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">context</span><span class="p">[</span><span class="s1">&#39;exception&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">error</span>
<span class="c1"># Let&#39;s go deeper</span>
<span class="k">try</span><span class="p">:</span>
<span class="c1"># Since this function is called only when the TI state is running,</span>
<span class="c1"># try_number contains the current try_number (not the next). We</span>
<span class="c1"># only mark task instance as FAILED if the next task instance</span>
<span class="c1"># try_number exceeds the max_tries.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_eligible_to_retry</span><span class="p">():</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Marking task as UP_FOR_RETRY&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">email_on_retry</span> <span class="ow">and</span> <span class="n">task</span><span class="o">.</span><span class="n">email</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">email_alert</span><span class="p">(</span><span class="n">error</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">FAILED</span>
<span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">retries</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;All retries failed; marking task as FAILED&#39;</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Marking task as FAILED.&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">email_on_failure</span> <span class="ow">and</span> <span class="n">task</span><span class="o">.</span><span class="n">email</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">email_alert</span><span class="p">(</span><span class="n">error</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e2</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s1">&#39;Failed to send email to: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">email</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e2</span><span class="p">)</span>
<span class="c1"># Handling callbacks pessimistically</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> <span class="ow">and</span> <span class="n">task</span><span class="o">.</span><span class="n">on_retry_callback</span><span class="p">:</span>
<span class="n">task</span><span class="o">.</span><span class="n">on_retry_callback</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">FAILED</span> <span class="ow">and</span> <span class="n">task</span><span class="o">.</span><span class="n">on_failure_callback</span><span class="p">:</span>
<span class="n">task</span><span class="o">.</span><span class="n">on_failure_callback</span><span class="p">(</span><span class="n">context</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e3</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">&quot;Failed at executing callback&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e3</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">test_mode</span><span class="p">:</span>
<span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div>
<div class="viewcode-block" id="TaskInstance.is_eligible_to_retry"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.is_eligible_to_retry">[docs]</a> <span class="k">def</span> <span class="nf">is_eligible_to_retry</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;Is task instance is eligible for retry&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retries</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">&lt;=</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.get_template_context"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.get_template_context">[docs]</a> <span class="k">def</span> <span class="nf">get_template_context</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span>
<span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">macros</span>
<span class="n">tables</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="s1">&#39;tables&#39;</span> <span class="ow">in</span> <span class="n">task</span><span class="o">.</span><span class="n">params</span><span class="p">:</span>
<span class="n">tables</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">params</span><span class="p">[</span><span class="s1">&#39;tables&#39;</span><span class="p">]</span>
<span class="n">params</span> <span class="o">=</span> <span class="p">{}</span>
<span class="n">run_id</span> <span class="o">=</span> <span class="s1">&#39;&#39;</span>
<span class="n">dag_run</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">task</span><span class="p">,</span> <span class="s1">&#39;dag&#39;</span><span class="p">):</span>
<span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">params</span><span class="p">:</span>
<span class="n">params</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">params</span><span class="p">)</span>
<span class="kn">from</span> <span class="nn">airflow.models.dagrun</span> <span class="k">import</span> <span class="n">DagRun</span> <span class="c1"># Avoid circular import</span>
<span class="n">dag_run</span> <span class="o">=</span> <span class="p">(</span>
<span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagRun</span><span class="p">)</span>
<span class="o">.</span><span class="n">filter_by</span><span class="p">(</span>
<span class="n">dag_id</span><span class="o">=</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">execution_date</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>
<span class="o">.</span><span class="n">first</span><span class="p">()</span>
<span class="p">)</span>
<span class="n">run_id</span> <span class="o">=</span> <span class="n">dag_run</span><span class="o">.</span><span class="n">run_id</span> <span class="k">if</span> <span class="n">dag_run</span> <span class="k">else</span> <span class="kc">None</span>
<span class="n">session</span><span class="o">.</span><span class="n">expunge_all</span><span class="p">()</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="n">ds</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">&#39;%Y-%m-</span><span class="si">%d</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="n">ts</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span>
<span class="n">yesterday_ds</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">-</span> <span class="n">timedelta</span><span class="p">(</span><span class="mi">1</span><span class="p">))</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">&#39;%Y-%m-</span><span class="si">%d</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="n">tomorrow_ds</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">+</span> <span class="n">timedelta</span><span class="p">(</span><span class="mi">1</span><span class="p">))</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">&#39;%Y-%m-</span><span class="si">%d</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="c1"># For manually triggered dagruns that aren&#39;t run on a schedule, next/previous</span>
<span class="c1"># schedule dates don&#39;t make sense, and should be set to execution date for</span>
<span class="c1"># consistency with how execution_date is set for manually triggered tasks, i.e.</span>
<span class="c1"># triggered_date == execution_date.</span>
<span class="k">if</span> <span class="n">dag_run</span> <span class="ow">and</span> <span class="n">dag_run</span><span class="o">.</span><span class="n">external_trigger</span><span class="p">:</span>
<span class="n">prev_execution_date</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span>
<span class="n">next_execution_date</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">prev_execution_date</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">previous_schedule</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>
<span class="n">next_execution_date</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">following_schedule</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>
<span class="n">next_ds</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">next_ds_nodash</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">next_execution_date</span><span class="p">:</span>
<span class="n">next_ds</span> <span class="o">=</span> <span class="n">next_execution_date</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">&#39;%Y-%m-</span><span class="si">%d</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="n">next_ds_nodash</span> <span class="o">=</span> <span class="n">next_ds</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;-&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="n">next_execution_date</span> <span class="o">=</span> <span class="n">pendulum</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="n">next_execution_date</span><span class="p">)</span>
<span class="n">prev_ds</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">prev_ds_nodash</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">if</span> <span class="n">prev_execution_date</span><span class="p">:</span>
<span class="n">prev_ds</span> <span class="o">=</span> <span class="n">prev_execution_date</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">&#39;%Y-%m-</span><span class="si">%d</span><span class="s1">&#39;</span><span class="p">)</span>
<span class="n">prev_ds_nodash</span> <span class="o">=</span> <span class="n">prev_ds</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;-&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="n">prev_execution_date</span> <span class="o">=</span> <span class="n">pendulum</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="n">prev_execution_date</span><span class="p">)</span>
<span class="n">ds_nodash</span> <span class="o">=</span> <span class="n">ds</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;-&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="n">ts_nodash</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">&#39;%Y%m</span><span class="si">%d</span><span class="s1">T%H%M%S&#39;</span><span class="p">)</span>
<span class="n">ts_nodash_with_tz</span> <span class="o">=</span> <span class="n">ts</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;-&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;:&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="n">yesterday_ds_nodash</span> <span class="o">=</span> <span class="n">yesterday_ds</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;-&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="n">tomorrow_ds_nodash</span> <span class="o">=</span> <span class="n">tomorrow_ds</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;-&#39;</span><span class="p">,</span> <span class="s1">&#39;&#39;</span><span class="p">)</span>
<span class="n">ti_key_str</span> <span class="o">=</span> <span class="s2">&quot;</span><span class="si">{dag_id}</span><span class="s2">__</span><span class="si">{task_id}</span><span class="s2">__</span><span class="si">{ds_nodash}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">dag_id</span><span class="o">=</span><span class="n">task</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="o">=</span><span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">ds_nodash</span><span class="o">=</span><span class="n">ds_nodash</span><span class="p">)</span>
<span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">params</span><span class="p">:</span>
<span class="n">params</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">params</span><span class="p">)</span>
<span class="k">if</span> <span class="n">configuration</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">&#39;core&#39;</span><span class="p">,</span> <span class="s1">&#39;dag_run_conf_overrides_params&#39;</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">overwrite_params_with_dag_run_conf</span><span class="p">(</span><span class="n">params</span><span class="o">=</span><span class="n">params</span><span class="p">,</span> <span class="n">dag_run</span><span class="o">=</span><span class="n">dag_run</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">VariableAccessor</span><span class="p">:</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Wrapper around Variable. This way you can get variables in templates by using</span>
<span class="sd"> {var.value.your_variable_name}.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="nf">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">item</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">item</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">var</span>
<span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">var</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">VariableJsonAccessor</span><span class="p">:</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Wrapper around deserialized Variables. This way you can get variables</span>
<span class="sd"> in templates by using {var.json.your_variable_name}.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">def</span> <span class="nf">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">item</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">item</span><span class="p">,</span> <span class="n">deserialize_json</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">var</span>
<span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">var</span><span class="p">)</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">&#39;dag&#39;</span><span class="p">:</span> <span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="p">,</span>
<span class="s1">&#39;ds&#39;</span><span class="p">:</span> <span class="n">ds</span><span class="p">,</span>
<span class="s1">&#39;next_ds&#39;</span><span class="p">:</span> <span class="n">next_ds</span><span class="p">,</span>
<span class="s1">&#39;next_ds_nodash&#39;</span><span class="p">:</span> <span class="n">next_ds_nodash</span><span class="p">,</span>
<span class="s1">&#39;prev_ds&#39;</span><span class="p">:</span> <span class="n">prev_ds</span><span class="p">,</span>
<span class="s1">&#39;prev_ds_nodash&#39;</span><span class="p">:</span> <span class="n">prev_ds_nodash</span><span class="p">,</span>
<span class="s1">&#39;ds_nodash&#39;</span><span class="p">:</span> <span class="n">ds_nodash</span><span class="p">,</span>
<span class="s1">&#39;ts&#39;</span><span class="p">:</span> <span class="n">ts</span><span class="p">,</span>
<span class="s1">&#39;ts_nodash&#39;</span><span class="p">:</span> <span class="n">ts_nodash</span><span class="p">,</span>
<span class="s1">&#39;ts_nodash_with_tz&#39;</span><span class="p">:</span> <span class="n">ts_nodash_with_tz</span><span class="p">,</span>
<span class="s1">&#39;yesterday_ds&#39;</span><span class="p">:</span> <span class="n">yesterday_ds</span><span class="p">,</span>
<span class="s1">&#39;yesterday_ds_nodash&#39;</span><span class="p">:</span> <span class="n">yesterday_ds_nodash</span><span class="p">,</span>
<span class="s1">&#39;tomorrow_ds&#39;</span><span class="p">:</span> <span class="n">tomorrow_ds</span><span class="p">,</span>
<span class="s1">&#39;tomorrow_ds_nodash&#39;</span><span class="p">:</span> <span class="n">tomorrow_ds_nodash</span><span class="p">,</span>
<span class="s1">&#39;END_DATE&#39;</span><span class="p">:</span> <span class="n">ds</span><span class="p">,</span>
<span class="s1">&#39;end_date&#39;</span><span class="p">:</span> <span class="n">ds</span><span class="p">,</span>
<span class="s1">&#39;dag_run&#39;</span><span class="p">:</span> <span class="n">dag_run</span><span class="p">,</span>
<span class="s1">&#39;run_id&#39;</span><span class="p">:</span> <span class="n">run_id</span><span class="p">,</span>
<span class="s1">&#39;execution_date&#39;</span><span class="p">:</span> <span class="n">pendulum</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">),</span>
<span class="s1">&#39;prev_execution_date&#39;</span><span class="p">:</span> <span class="n">prev_execution_date</span><span class="p">,</span>
<span class="s1">&#39;prev_execution_date_success&#39;</span><span class="p">:</span> <span class="n">lazy_object_proxy</span><span class="o">.</span><span class="n">Proxy</span><span class="p">(</span>
<span class="k">lambda</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">previous_execution_date_success</span><span class="p">),</span>
<span class="s1">&#39;prev_start_date_success&#39;</span><span class="p">:</span> <span class="n">lazy_object_proxy</span><span class="o">.</span><span class="n">Proxy</span><span class="p">(</span><span class="k">lambda</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">previous_start_date_success</span><span class="p">),</span>
<span class="s1">&#39;next_execution_date&#39;</span><span class="p">:</span> <span class="n">next_execution_date</span><span class="p">,</span>
<span class="s1">&#39;latest_date&#39;</span><span class="p">:</span> <span class="n">ds</span><span class="p">,</span>
<span class="s1">&#39;macros&#39;</span><span class="p">:</span> <span class="n">macros</span><span class="p">,</span>
<span class="s1">&#39;params&#39;</span><span class="p">:</span> <span class="n">params</span><span class="p">,</span>
<span class="s1">&#39;tables&#39;</span><span class="p">:</span> <span class="n">tables</span><span class="p">,</span>
<span class="s1">&#39;task&#39;</span><span class="p">:</span> <span class="n">task</span><span class="p">,</span>
<span class="s1">&#39;task_instance&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="p">,</span>
<span class="s1">&#39;ti&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="p">,</span>
<span class="s1">&#39;task_instance_key_str&#39;</span><span class="p">:</span> <span class="n">ti_key_str</span><span class="p">,</span>
<span class="s1">&#39;conf&#39;</span><span class="p">:</span> <span class="n">configuration</span><span class="p">,</span>
<span class="s1">&#39;test_mode&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">test_mode</span><span class="p">,</span>
<span class="s1">&#39;var&#39;</span><span class="p">:</span> <span class="p">{</span>
<span class="s1">&#39;value&#39;</span><span class="p">:</span> <span class="n">VariableAccessor</span><span class="p">(),</span>
<span class="s1">&#39;json&#39;</span><span class="p">:</span> <span class="n">VariableJsonAccessor</span><span class="p">()</span>
<span class="p">},</span>
<span class="s1">&#39;inlets&#39;</span><span class="p">:</span> <span class="n">task</span><span class="o">.</span><span class="n">inlets</span><span class="p">,</span>
<span class="s1">&#39;outlets&#39;</span><span class="p">:</span> <span class="n">task</span><span class="o">.</span><span class="n">outlets</span><span class="p">,</span></div>
<span class="p">}</span>
<div class="viewcode-block" id="TaskInstance.overwrite_params_with_dag_run_conf"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.overwrite_params_with_dag_run_conf">[docs]</a> <span class="k">def</span> <span class="nf">overwrite_params_with_dag_run_conf</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">params</span><span class="p">,</span> <span class="n">dag_run</span><span class="p">):</span>
<span class="k">if</span> <span class="n">dag_run</span> <span class="ow">and</span> <span class="n">dag_run</span><span class="o">.</span><span class="n">conf</span><span class="p">:</span>
<span class="n">params</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">dag_run</span><span class="o">.</span><span class="n">conf</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.render_templates"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.render_templates">[docs]</a> <span class="k">def</span> <span class="nf">render_templates</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">context</span><span class="p">:</span>
<span class="n">context</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_template_context</span><span class="p">()</span>
<span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;task&#39;</span><span class="p">)</span> <span class="ow">and</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="p">,</span> <span class="s1">&#39;dag&#39;</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">user_defined_macros</span><span class="p">:</span>
<span class="n">context</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">user_defined_macros</span><span class="p">)</span>
<span class="n">rt</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">render_template</span> <span class="c1"># shortcut to method</span>
<span class="k">for</span> <span class="n">attr</span> <span class="ow">in</span> <span class="n">task</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="n">template_fields</span><span class="p">:</span>
<span class="n">content</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="n">task</span><span class="p">,</span> <span class="n">attr</span><span class="p">)</span>
<span class="k">if</span> <span class="n">content</span><span class="p">:</span>
<span class="n">rendered_content</span> <span class="o">=</span> <span class="n">rt</span><span class="p">(</span><span class="n">attr</span><span class="p">,</span> <span class="n">content</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span>
<span class="nb">setattr</span><span class="p">(</span><span class="n">task</span><span class="p">,</span> <span class="n">attr</span><span class="p">,</span> <span class="n">rendered_content</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.email_alert"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.email_alert">[docs]</a> <span class="k">def</span> <span class="nf">email_alert</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exception</span><span class="p">):</span>
<span class="n">exception_html</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">exception</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;</span><span class="se">\n</span><span class="s1">&#39;</span><span class="p">,</span> <span class="s1">&#39;&lt;br&gt;&#39;</span><span class="p">)</span>
<span class="n">jinja_context</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_template_context</span><span class="p">()</span>
<span class="c1"># This function is called after changing the state</span>
<span class="c1"># from State.RUNNING so need to subtract 1 from self.try_number.</span>
<span class="n">jinja_context</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="nb">dict</span><span class="p">(</span>
<span class="n">exception</span><span class="o">=</span><span class="n">exception</span><span class="p">,</span>
<span class="n">exception_html</span><span class="o">=</span><span class="n">exception_html</span><span class="p">,</span>
<span class="n">try_number</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">-</span> <span class="mi">1</span><span class="p">,</span>
<span class="n">max_tries</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span><span class="p">))</span>
<span class="n">jinja_env</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">get_template_env</span><span class="p">()</span>
<span class="n">default_subject</span> <span class="o">=</span> <span class="s1">&#39;Airflow alert: {{ti}}&#39;</span>
<span class="c1"># For reporting purposes, we report based on 1-indexed,</span>
<span class="c1"># not 0-indexed lists (i.e. Try 1 instead of</span>
<span class="c1"># Try 0 for the first attempt).</span>
<span class="n">default_html_content</span> <span class="o">=</span> <span class="p">(</span>
<span class="s1">&#39;Try {{try_number}} out of {{max_tries + 1}}&lt;br&gt;&#39;</span>
<span class="s1">&#39;Exception:&lt;br&gt;{{exception_html}}&lt;br&gt;&#39;</span>
<span class="s1">&#39;Log: &lt;a href=&quot;{{ti.log_url}}&quot;&gt;Link&lt;/a&gt;&lt;br&gt;&#39;</span>
<span class="s1">&#39;Host: {{ti.hostname}}&lt;br&gt;&#39;</span>
<span class="s1">&#39;Log file: {{ti.log_filepath}}&lt;br&gt;&#39;</span>
<span class="s1">&#39;Mark success: &lt;a href=&quot;{{ti.mark_success_url}}&quot;&gt;Link&lt;/a&gt;&lt;br&gt;&#39;</span>
<span class="p">)</span>
<span class="k">def</span> <span class="nf">render</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">content</span><span class="p">):</span>
<span class="k">if</span> <span class="n">configuration</span><span class="o">.</span><span class="n">has_option</span><span class="p">(</span><span class="s1">&#39;email&#39;</span><span class="p">,</span> <span class="n">key</span><span class="p">):</span>
<span class="n">path</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;email&#39;</span><span class="p">,</span> <span class="n">key</span><span class="p">)</span>
<span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">path</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span>
<span class="n">content</span> <span class="o">=</span> <span class="n">f</span><span class="o">.</span><span class="n">read</span><span class="p">()</span>
<span class="k">return</span> <span class="n">jinja_env</span><span class="o">.</span><span class="n">from_string</span><span class="p">(</span><span class="n">content</span><span class="p">)</span><span class="o">.</span><span class="n">render</span><span class="p">(</span><span class="o">**</span><span class="n">jinja_context</span><span class="p">)</span>
<span class="n">subject</span> <span class="o">=</span> <span class="n">render</span><span class="p">(</span><span class="s1">&#39;subject_template&#39;</span><span class="p">,</span> <span class="n">default_subject</span><span class="p">)</span>
<span class="n">html_content</span> <span class="o">=</span> <span class="n">render</span><span class="p">(</span><span class="s1">&#39;html_content_template&#39;</span><span class="p">,</span> <span class="n">default_html_content</span><span class="p">)</span>
<span class="n">send_email</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">email</span><span class="p">,</span> <span class="n">subject</span><span class="p">,</span> <span class="n">html_content</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.set_duration"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.set_duration">[docs]</a> <span class="k">def</span> <span class="nf">set_duration</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">duration</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span><span class="p">)</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">duration</span> <span class="o">=</span> <span class="kc">None</span></div>
<div class="viewcode-block" id="TaskInstance.xcom_push"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.xcom_push">[docs]</a> <span class="k">def</span> <span class="nf">xcom_push</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">key</span><span class="p">,</span>
<span class="n">value</span><span class="p">,</span>
<span class="n">execution_date</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Make an XCom available for tasks to pull.</span>
<span class="sd"> :param key: A key for the XCom</span>
<span class="sd"> :type key: str</span>
<span class="sd"> :param value: A value for the XCom. The value is pickled and stored</span>
<span class="sd"> in the database.</span>
<span class="sd"> :type value: any pickleable object</span>
<span class="sd"> :param execution_date: if provided, the XCom will not be visible until</span>
<span class="sd"> this date. This can be used, for example, to send a message to a</span>
<span class="sd"> task on a future date without it being immediately visible.</span>
<span class="sd"> :type execution_date: datetime</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">execution_date</span> <span class="ow">and</span> <span class="n">execution_date</span> <span class="o">&lt;</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">:</span>
<span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span>
<span class="s1">&#39;execution_date can not be in the past (current &#39;</span>
<span class="s1">&#39;execution_date is </span><span class="si">{}</span><span class="s1">; received </span><span class="si">{}</span><span class="s1">)&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">))</span>
<span class="n">XCom</span><span class="o">.</span><span class="n">set</span><span class="p">(</span>
<span class="n">key</span><span class="o">=</span><span class="n">key</span><span class="p">,</span>
<span class="n">value</span><span class="o">=</span><span class="n">value</span><span class="p">,</span>
<span class="n">task_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span>
<span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">execution_date</span><span class="o">=</span><span class="n">execution_date</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span></div>
<div class="viewcode-block" id="TaskInstance.xcom_pull"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.xcom_pull">[docs]</a> <span class="k">def</span> <span class="nf">xcom_pull</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">task_ids</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">dag_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">key</span><span class="o">=</span><span class="n">XCOM_RETURN_KEY</span><span class="p">,</span>
<span class="n">include_prior_dates</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Pull XComs that optionally meet certain criteria.</span>
<span class="sd"> The default value for `key` limits the search to XComs</span>
<span class="sd"> that were returned by other tasks (as opposed to those that were pushed</span>
<span class="sd"> manually). To remove this filter, pass key=None (or any desired value).</span>
<span class="sd"> If a single task_id string is provided, the result is the value of the</span>
<span class="sd"> most recent matching XCom from that task_id. If multiple task_ids are</span>
<span class="sd"> provided, a tuple of matching values is returned. None is returned</span>
<span class="sd"> whenever no matches are found.</span>
<span class="sd"> :param key: A key for the XCom. If provided, only XComs with matching</span>
<span class="sd"> keys will be returned. The default key is &#39;return_value&#39;, also</span>
<span class="sd"> available as a constant XCOM_RETURN_KEY. This key is automatically</span>
<span class="sd"> given to XComs returned by tasks (as opposed to being pushed</span>
<span class="sd"> manually). To remove the filter, pass key=None.</span>
<span class="sd"> :type key: str</span>
<span class="sd"> :param task_ids: Only XComs from tasks with matching ids will be</span>
<span class="sd"> pulled. Can pass None to remove the filter.</span>
<span class="sd"> :type task_ids: str or iterable of strings (representing task_ids)</span>
<span class="sd"> :param dag_id: If provided, only pulls XComs from this DAG.</span>
<span class="sd"> If None (default), the DAG of the calling task is used.</span>
<span class="sd"> :type dag_id: str</span>
<span class="sd"> :param include_prior_dates: If False, only XComs from the current</span>
<span class="sd"> execution_date are returned. If True, XComs from previous dates</span>
<span class="sd"> are returned as well.</span>
<span class="sd"> :type include_prior_dates: bool</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">dag_id</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="n">dag_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span>
<span class="n">pull_fn</span> <span class="o">=</span> <span class="n">functools</span><span class="o">.</span><span class="n">partial</span><span class="p">(</span>
<span class="n">XCom</span><span class="o">.</span><span class="n">get_one</span><span class="p">,</span>
<span class="n">execution_date</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span>
<span class="n">key</span><span class="o">=</span><span class="n">key</span><span class="p">,</span>
<span class="n">dag_id</span><span class="o">=</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">include_prior_dates</span><span class="o">=</span><span class="n">include_prior_dates</span><span class="p">)</span>
<span class="k">if</span> <span class="n">is_container</span><span class="p">(</span><span class="n">task_ids</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">tuple</span><span class="p">(</span><span class="n">pull_fn</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="n">t</span><span class="p">)</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">task_ids</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="n">pull_fn</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="n">task_ids</span><span class="p">)</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="TaskInstance.get_num_running_task_instances"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.get_num_running_task_instances">[docs]</a> <span class="k">def</span> <span class="nf">get_num_running_task_instances</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="p">):</span>
<span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span>
<span class="k">return</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">TI</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">TI</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span>
<span class="n">TI</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span></div>
<span class="p">)</span><span class="o">.</span><span class="n">count</span><span class="p">()</span>
<div class="viewcode-block" id="TaskInstance.init_run_context"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.init_run_context">[docs]</a> <span class="k">def</span> <span class="nf">init_run_context</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">raw</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Sets the log context.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">raw</span> <span class="o">=</span> <span class="n">raw</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_set_context</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span></div></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>