| |
| |
| <!-- |
| Javascript to render AIRFLOW-XXX and PR references in text |
| as HTML links. |
| |
| Overrides extrahead block from sphinx_rtd_theme |
| https://www.sphinx-doc.org/en/master/templating.html |
| --> |
| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>airflow.models.taskinstance — Airflow Documentation</title> |
| |
| |
| |
| |
| <link rel="shortcut icon" href="../../../_static/pin_32.png"/> |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../_static/language_data.js"></script> |
| |
| <script type="text/javascript" src="../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../_static/graphviz.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../search.html" /> |
| |
| <script> |
| document.addEventListener('DOMContentLoaded', function() { |
| var el = document.getElementById('changelog'); |
| if (el !== null ) { |
| // [AIRFLOW-...] |
| el.innerHTML = el.innerHTML.replace( |
| /\[(AIRFLOW-[\d]+)\]/g, |
| `<a href="https://issues.apache.org/jira/browse/$1">[$1]</a>` |
| ); |
| // (#...) |
| el.innerHTML = el.innerHTML.replace( |
| /\(#([\d]+)\)/g, |
| `<a href="https://github.com/apache/airflow/pull/$1">(#$1)</a>` |
| ); |
| }; |
| }) |
| </script> |
| <script type="text/javascript"> |
| var _gaq = _gaq || []; |
| _gaq.push(['_setAccount', 'UA-140539454-1']); |
| _gaq.push(['_trackPageview']); |
| </script> |
| <style> |
| .example-header { |
| position: relative; |
| background: #9AAA7A; |
| padding: 8px 16px; |
| margin-bottom: 0; |
| } |
| .example-header--with-button { |
| padding-right: 166px; |
| } |
| .example-header:after{ |
| content: ''; |
| display: table; |
| clear: both; |
| } |
| .example-title { |
| display:block; |
| padding: 4px; |
| margin-right: 16px; |
| color: white; |
| overflow-x: auto; |
| } |
| .example-header-button { |
| top: 8px; |
| right: 16px; |
| position: absolute; |
| } |
| .example-header + .highlight-python { |
| margin-top: 0 !important; |
| } |
| .viewcode-button { |
| display: inline-block; |
| padding: 8px 16px; |
| border: 0; |
| margin: 0; |
| outline: 0; |
| border-radius: 2px; |
| -webkit-box-shadow: 0 3px 5px 0 rgba(0,0,0,.3); |
| box-shadow: 0 3px 6px 0 rgba(0,0,0,.3); |
| color: #404040; |
| background-color: #e7e7e7; |
| cursor: pointer; |
| font-size: 16px; |
| font-weight: 500; |
| line-height: 1; |
| text-decoration: none; |
| text-overflow: ellipsis; |
| overflow: hidden; |
| text-transform: uppercase; |
| -webkit-transition: background-color .2s; |
| transition: background-color .2s; |
| vertical-align: middle; |
| white-space: nowrap; |
| } |
| .viewcode-button:visited { |
| color: #404040; |
| } |
| .viewcode-button:hover, .viewcode-button:focus { |
| color: #404040; |
| background-color: #d6d6d6; |
| } |
| </style> |
| |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../index.html" class="icon icon-home"> Airflow |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 1.10.6 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../project.html">Project</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../license.html">License</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../start.html">Quick Start</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../installation.html">Installation</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../tutorial.html">Tutorial</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../howto/index.html">How-to Guides</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../ui.html">UI / Screenshots</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../concepts.html">Concepts</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../profiling.html">Data Profiling</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../cli.html">Command Line Interface Reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../scheduler.html">Scheduling & Triggers</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../plugins.html">Plugins</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../security.html">Security</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../timezone.html">Time zones</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../api.html">REST API Reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../integration.html">Integration</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../metrics.html">Metrics</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../errors.html">Error Tracking</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../kubernetes.html">Kubernetes</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../lineage.html">Lineage</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../changelog.html">Changelog</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../faq.html">FAQ</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../macros.html">Macros reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../_api/index.html">Python API Reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../privacy_notice.html">Privacy Notice</a></li> |
| </ul> |
| <p class="caption"><span class="caption-text">References</span></p> |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../_api/index.html">Python API</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../index.html">Airflow</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../index.html">Module code</a> »</li> |
| |
| <li><a href="../models.html">airflow.models</a> »</li> |
| |
| <li>airflow.models.taskinstance</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for airflow.models.taskinstance</h1><div class="highlight"><pre> |
| <span></span><span class="c1"># -*- coding: utf-8 -*-</span> |
| <span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span> |
| <span class="c1"># or more contributor license agreements. See the NOTICE file</span> |
| <span class="c1"># distributed with this work for additional information</span> |
| <span class="c1"># regarding copyright ownership. The ASF licenses this file</span> |
| <span class="c1"># to you under the Apache License, Version 2.0 (the</span> |
| <span class="c1"># "License"); you may not use this file except in compliance</span> |
| <span class="c1"># with the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing,</span> |
| <span class="c1"># software distributed under the License is distributed on an</span> |
| <span class="c1"># "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span> |
| <span class="c1"># KIND, either express or implied. See the License for the</span> |
| <span class="c1"># specific language governing permissions and limitations</span> |
| <span class="c1"># under the License.</span> |
| |
| <span class="kn">import</span> <span class="nn">copy</span> |
| <span class="kn">import</span> <span class="nn">functools</span> |
| <span class="kn">import</span> <span class="nn">getpass</span> |
| <span class="kn">import</span> <span class="nn">hashlib</span> |
| <span class="kn">import</span> <span class="nn">logging</span> |
| <span class="kn">import</span> <span class="nn">math</span> |
| <span class="kn">import</span> <span class="nn">os</span> |
| <span class="kn">import</span> <span class="nn">signal</span> |
| <span class="kn">import</span> <span class="nn">time</span> |
| <span class="kn">from</span> <span class="nn">datetime</span> <span class="k">import</span> <span class="n">timedelta</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="k">import</span> <span class="n">Optional</span> |
| <span class="kn">from</span> <span class="nn">urllib.parse</span> <span class="k">import</span> <span class="n">quote</span> |
| |
| <span class="kn">import</span> <span class="nn">dill</span> |
| <span class="kn">import</span> <span class="nn">lazy_object_proxy</span> |
| <span class="kn">import</span> <span class="nn">pendulum</span> |
| <span class="kn">from</span> <span class="nn">six.moves.urllib.parse</span> <span class="k">import</span> <span class="n">quote_plus</span> |
| <span class="kn">from</span> <span class="nn">sqlalchemy</span> <span class="k">import</span> <span class="n">Column</span><span class="p">,</span> <span class="n">String</span><span class="p">,</span> <span class="n">Float</span><span class="p">,</span> <span class="n">Integer</span><span class="p">,</span> <span class="n">PickleType</span><span class="p">,</span> <span class="n">Index</span><span class="p">,</span> <span class="n">func</span> |
| <span class="kn">from</span> <span class="nn">sqlalchemy.orm</span> <span class="k">import</span> <span class="n">reconstructor</span> |
| <span class="kn">from</span> <span class="nn">sqlalchemy.orm.session</span> <span class="k">import</span> <span class="n">Session</span> |
| |
| <span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">settings</span> |
| <span class="kn">from</span> <span class="nn">airflow.configuration</span> <span class="k">import</span> <span class="n">conf</span> |
| <span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="p">(</span> |
| <span class="n">AirflowException</span><span class="p">,</span> <span class="n">AirflowTaskTimeout</span><span class="p">,</span> <span class="n">AirflowSkipException</span><span class="p">,</span> <span class="n">AirflowRescheduleException</span> |
| <span class="p">)</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.base</span> <span class="k">import</span> <span class="n">Base</span><span class="p">,</span> <span class="n">ID_LEN</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.log</span> <span class="k">import</span> <span class="n">Log</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.pool</span> <span class="k">import</span> <span class="n">Pool</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.taskfail</span> <span class="k">import</span> <span class="n">TaskFail</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.taskreschedule</span> <span class="k">import</span> <span class="n">TaskReschedule</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.variable</span> <span class="k">import</span> <span class="n">Variable</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.xcom</span> <span class="k">import</span> <span class="n">XCom</span><span class="p">,</span> <span class="n">XCOM_RETURN_KEY</span> |
| <span class="kn">from</span> <span class="nn">airflow.sentry</span> <span class="k">import</span> <span class="n">Sentry</span> |
| <span class="kn">from</span> <span class="nn">airflow.settings</span> <span class="k">import</span> <span class="n">Stats</span> |
| <span class="kn">from</span> <span class="nn">airflow.ti_deps.dep_context</span> <span class="k">import</span> <span class="n">DepContext</span><span class="p">,</span> <span class="n">REQUEUEABLE_DEPS</span><span class="p">,</span> <span class="n">RUNNING_DEPS</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils</span> <span class="k">import</span> <span class="n">timezone</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.db</span> <span class="k">import</span> <span class="n">provide_session</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.email</span> <span class="k">import</span> <span class="n">send_email</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.helpers</span> <span class="k">import</span> <span class="n">is_container</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.log.logging_mixin</span> <span class="k">import</span> <span class="n">LoggingMixin</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.net</span> <span class="k">import</span> <span class="n">get_hostname</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.sqlalchemy</span> <span class="k">import</span> <span class="n">UtcDateTime</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="k">import</span> <span class="n">State</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.timeout</span> <span class="k">import</span> <span class="n">timeout</span> |
| |
| |
| <div class="viewcode-block" id="clear_task_instances"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.clear_task_instances">[docs]</a><span class="k">def</span> <span class="nf">clear_task_instances</span><span class="p">(</span><span class="n">tis</span><span class="p">,</span> |
| <span class="n">session</span><span class="p">,</span> |
| <span class="n">activate_dag_runs</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">dag</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Clears a set of task instances, but makes sure the running ones</span> |
| <span class="sd"> get killed.</span> |
| |
| <span class="sd"> :param tis: a list of task instances</span> |
| <span class="sd"> :param session: current session</span> |
| <span class="sd"> :param activate_dag_runs: flag to check for active dag run</span> |
| <span class="sd"> :param dag: DAG object</span> |
| <span class="sd"> """</span> |
| <span class="n">job_ids</span> <span class="o">=</span> <span class="p">[]</span> |
| <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">ti</span><span class="o">.</span><span class="n">job_id</span><span class="p">:</span> |
| <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SHUTDOWN</span> |
| <span class="n">job_ids</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">ti</span><span class="o">.</span><span class="n">job_id</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">task_id</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">task_id</span> |
| <span class="k">if</span> <span class="n">dag</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">has_task</span><span class="p">(</span><span class="n">task_id</span><span class="p">):</span> |
| <span class="n">task</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">task_id</span><span class="p">)</span> |
| <span class="n">task_retries</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">retries</span> |
| <span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">try_number</span> <span class="o">+</span> <span class="n">task_retries</span> <span class="o">-</span> <span class="mi">1</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="c1"># Ignore errors when updating max_tries if dag is None or</span> |
| <span class="c1"># task not found in dag since database records could be</span> |
| <span class="c1"># outdated. We make max_tries the maximum value of its</span> |
| <span class="c1"># original max_tries or the current task try number.</span> |
| <span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="nb">max</span><span class="p">(</span><span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span><span class="p">,</span> <span class="n">ti</span><span class="o">.</span><span class="n">try_number</span> <span class="o">-</span> <span class="mi">1</span><span class="p">)</span> |
| <span class="n">ti</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">NONE</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="n">ti</span><span class="p">)</span> |
| <span class="c1"># Clear all reschedules related to the ti to clear</span> |
| <span class="n">TR</span> <span class="o">=</span> <span class="n">TaskReschedule</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TR</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span> |
| <span class="n">TR</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="n">ti</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">TR</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="n">ti</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> |
| <span class="n">TR</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="n">ti</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> |
| <span class="n">TR</span><span class="o">.</span><span class="n">try_number</span> <span class="o">==</span> <span class="n">ti</span><span class="o">.</span><span class="n">try_number</span> |
| <span class="p">)</span><span class="o">.</span><span class="n">delete</span><span class="p">()</span> |
| |
| <span class="k">if</span> <span class="n">job_ids</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">airflow.jobs</span> <span class="k">import</span> <span class="n">BaseJob</span> <span class="k">as</span> <span class="n">BJ</span> |
| <span class="k">for</span> <span class="n">job</span> <span class="ow">in</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">BJ</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">BJ</span><span class="o">.</span><span class="n">id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">job_ids</span><span class="p">))</span><span class="o">.</span><span class="n">all</span><span class="p">():</span> |
| <span class="n">job</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SHUTDOWN</span> |
| |
| <span class="k">if</span> <span class="n">activate_dag_runs</span> <span class="ow">and</span> <span class="n">tis</span><span class="p">:</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.dagrun</span> <span class="k">import</span> <span class="n">DagRun</span> <span class="c1"># Avoid circular import</span> |
| <span class="n">drs</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagRun</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span> |
| <span class="n">DagRun</span><span class="o">.</span><span class="n">dag_id</span><span class="o">.</span><span class="n">in_</span><span class="p">({</span><span class="n">ti</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">}),</span> |
| <span class="n">DagRun</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">in_</span><span class="p">({</span><span class="n">ti</span><span class="o">.</span><span class="n">execution_date</span> <span class="k">for</span> <span class="n">ti</span> <span class="ow">in</span> <span class="n">tis</span><span class="p">}),</span> |
| <span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">()</span> |
| <span class="k">for</span> <span class="n">dr</span> <span class="ow">in</span> <span class="n">drs</span><span class="p">:</span> |
| <span class="n">dr</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span> |
| <span class="n">dr</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span></div> |
| |
| |
| <div class="viewcode-block" id="TaskInstance"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance">[docs]</a><span class="k">class</span> <span class="nc">TaskInstance</span><span class="p">(</span><span class="n">Base</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Task instances store the state of a task instance. This table is the</span> |
| <span class="sd"> authority and single source of truth around what tasks have run and the</span> |
| <span class="sd"> state they are in.</span> |
| |
| <span class="sd"> The SqlAlchemy model doesn't have a SqlAlchemy foreign key to the task or</span> |
| <span class="sd"> dag model deliberately to have more control over transactions.</span> |
| |
| <span class="sd"> Database transactions on this table should insure double triggers and</span> |
| <span class="sd"> any confusion around what task instances are or aren't ready to run</span> |
| <span class="sd"> even while multiple schedulers may be firing task instances.</span> |
| <span class="sd"> """</span> |
| |
| <div class="viewcode-block" id="TaskInstance.__tablename__"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.__tablename__">[docs]</a> <span class="n">__tablename__</span> <span class="o">=</span> <span class="s2">"task_instance"</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.task_id"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.task_id">[docs]</a> <span class="n">task_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="n">ID_LEN</span><span class="p">),</span> <span class="n">primary_key</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.dag_id"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.dag_id">[docs]</a> <span class="n">dag_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="n">ID_LEN</span><span class="p">),</span> <span class="n">primary_key</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.execution_date"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.execution_date">[docs]</a> <span class="n">execution_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">UtcDateTime</span><span class="p">,</span> <span class="n">primary_key</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.start_date"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.start_date">[docs]</a> <span class="n">start_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">UtcDateTime</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.end_date"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.end_date">[docs]</a> <span class="n">end_date</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">UtcDateTime</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.duration"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.duration">[docs]</a> <span class="n">duration</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Float</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.state"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.state">[docs]</a> <span class="n">state</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">20</span><span class="p">))</span></div> |
| <div class="viewcode-block" id="TaskInstance._try_number"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance._try_number">[docs]</a> <span class="n">_try_number</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="s1">'try_number'</span><span class="p">,</span> <span class="n">Integer</span><span class="p">,</span> <span class="n">default</span><span class="o">=</span><span class="mi">0</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.max_tries"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.max_tries">[docs]</a> <span class="n">max_tries</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.hostname"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.hostname">[docs]</a> <span class="n">hostname</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">1000</span><span class="p">))</span></div> |
| <div class="viewcode-block" id="TaskInstance.unixname"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.unixname">[docs]</a> <span class="n">unixname</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">1000</span><span class="p">))</span></div> |
| <div class="viewcode-block" id="TaskInstance.job_id"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.job_id">[docs]</a> <span class="n">job_id</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.pool"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.pool">[docs]</a> <span class="n">pool</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">50</span><span class="p">),</span> <span class="n">nullable</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.queue"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.queue">[docs]</a> <span class="n">queue</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">256</span><span class="p">))</span></div> |
| <div class="viewcode-block" id="TaskInstance.priority_weight"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.priority_weight">[docs]</a> <span class="n">priority_weight</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.operator"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.operator">[docs]</a> <span class="n">operator</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">String</span><span class="p">(</span><span class="mi">1000</span><span class="p">))</span></div> |
| <div class="viewcode-block" id="TaskInstance.queued_dttm"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.queued_dttm">[docs]</a> <span class="n">queued_dttm</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">UtcDateTime</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.pid"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.pid">[docs]</a> <span class="n">pid</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">Integer</span><span class="p">)</span></div> |
| <div class="viewcode-block" id="TaskInstance.executor_config"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.executor_config">[docs]</a> <span class="n">executor_config</span> <span class="o">=</span> <span class="n">Column</span><span class="p">(</span><span class="n">PickleType</span><span class="p">(</span><span class="n">pickler</span><span class="o">=</span><span class="n">dill</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.__table_args__"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.__table_args__">[docs]</a> <span class="n">__table_args__</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">Index</span><span class="p">(</span><span class="s1">'ti_dag_state'</span><span class="p">,</span> <span class="n">dag_id</span><span class="p">,</span> <span class="n">state</span><span class="p">),</span> |
| <span class="n">Index</span><span class="p">(</span><span class="s1">'ti_dag_date'</span><span class="p">,</span> <span class="n">dag_id</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">),</span> |
| <span class="n">Index</span><span class="p">(</span><span class="s1">'ti_state'</span><span class="p">,</span> <span class="n">state</span><span class="p">),</span> |
| <span class="n">Index</span><span class="p">(</span><span class="s1">'ti_state_lkp'</span><span class="p">,</span> <span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">,</span> <span class="n">state</span><span class="p">),</span> |
| <span class="n">Index</span><span class="p">(</span><span class="s1">'ti_pool'</span><span class="p">,</span> <span class="n">pool</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">priority_weight</span><span class="p">),</span> |
| <span class="n">Index</span><span class="p">(</span><span class="s1">'ti_job_id'</span><span class="p">,</span> <span class="n">job_id</span><span class="p">),</span></div> |
| <span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">,</span> <span class="n">state</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">dag_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task</span> <span class="o">=</span> <span class="n">task</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_log</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s2">"airflow.task"</span><span class="p">)</span> |
| |
| <span class="c1"># make sure we have a localized execution_date stored in UTC</span> |
| <span class="k">if</span> <span class="n">execution_date</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">timezone</span><span class="o">.</span><span class="n">is_localized</span><span class="p">(</span><span class="n">execution_date</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">"execution date </span><span class="si">%s</span><span class="s2"> has no timezone information. Using "</span> |
| <span class="s2">"default from dag or system"</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">has_dag</span><span class="p">():</span> |
| <span class="n">execution_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">make_aware</span><span class="p">(</span><span class="n">execution_date</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">timezone</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">execution_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">make_aware</span><span class="p">(</span><span class="n">execution_date</span><span class="p">)</span> |
| |
| <span class="n">execution_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">convert_to_utc</span><span class="p">(</span><span class="n">execution_date</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">=</span> <span class="n">execution_date</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">queue</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">queue</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pool</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">pool</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">priority_weight</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">priority_weight_total</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">=</span> <span class="mi">0</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retries</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">unixname</span> <span class="o">=</span> <span class="n">getpass</span><span class="o">.</span><span class="n">getuser</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">run_as_user</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">run_as_user</span> |
| <span class="k">if</span> <span class="n">state</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">state</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">hostname</span> <span class="o">=</span> <span class="s1">''</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">executor_config</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">executor_config</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">init_on_load</span><span class="p">()</span> |
| <span class="c1"># Is this TaskInstance being currently running within `airflow run --raw`.</span> |
| <span class="c1"># Not persisted to the database so only valid for the current process</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">raw</span> <span class="o">=</span> <span class="kc">False</span> |
| |
| <span class="nd">@reconstructor</span> |
| <div class="viewcode-block" id="TaskInstance.init_on_load"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.init_on_load">[docs]</a> <span class="k">def</span> <span class="nf">init_on_load</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">""" Initialize the attributes that aren't stored in the DB. """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">test_mode</span> <span class="o">=</span> <span class="kc">False</span> <span class="c1"># can be changed when calling 'run'</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="TaskInstance.try_number"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.try_number">[docs]</a> <span class="k">def</span> <span class="nf">try_number</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Return the try number that this task number will be when it is actually</span> |
| <span class="sd"> run.</span> |
| |
| <span class="sd"> If the TI is currently running, this will match the column in the</span> |
| <span class="sd"> databse, in all othercases this will be incremenetd</span> |
| <span class="sd"> """</span> |
| <span class="c1"># This is designed so that task logs end up in the right file.</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">+</span> <span class="mi">1</span></div> |
| |
| <span class="nd">@try_number</span><span class="o">.</span><span class="n">setter</span> |
| <span class="k">def</span> <span class="nf">try_number</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">=</span> <span class="n">value</span> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="TaskInstance.next_try_number"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.next_try_number">[docs]</a> <span class="k">def</span> <span class="nf">next_try_number</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">+</span> <span class="mi">1</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.command"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.command">[docs]</a> <span class="k">def</span> <span class="nf">command</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_all_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_task_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_ti_state</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">local</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">pickle_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">raw</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">cfg_path</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Returns a command that can be executed anywhere where airflow is</span> |
| <span class="sd"> installed. This command is part of the message sent to executors by</span> |
| <span class="sd"> the orchestrator.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="s2">" "</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">command_as_list</span><span class="p">(</span> |
| <span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span> |
| <span class="n">ignore_all_deps</span><span class="o">=</span><span class="n">ignore_all_deps</span><span class="p">,</span> |
| <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="n">ignore_depends_on_past</span><span class="p">,</span> |
| <span class="n">ignore_task_deps</span><span class="o">=</span><span class="n">ignore_task_deps</span><span class="p">,</span> |
| <span class="n">ignore_ti_state</span><span class="o">=</span><span class="n">ignore_ti_state</span><span class="p">,</span> |
| <span class="n">local</span><span class="o">=</span><span class="n">local</span><span class="p">,</span> |
| <span class="n">pickle_id</span><span class="o">=</span><span class="n">pickle_id</span><span class="p">,</span> |
| <span class="n">raw</span><span class="o">=</span><span class="n">raw</span><span class="p">,</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="n">job_id</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span> |
| <span class="n">cfg_path</span><span class="o">=</span><span class="n">cfg_path</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.command_as_list"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.command_as_list">[docs]</a> <span class="k">def</span> <span class="nf">command_as_list</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_all_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_task_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_ti_state</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">local</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">pickle_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">raw</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">cfg_path</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Returns a command that can be executed anywhere where airflow is</span> |
| <span class="sd"> installed. This command is part of the message sent to executors by</span> |
| <span class="sd"> the orchestrator.</span> |
| <span class="sd"> """</span> |
| <span class="n">dag</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span> |
| |
| <span class="n">should_pass_filepath</span> <span class="o">=</span> <span class="ow">not</span> <span class="n">pickle_id</span> <span class="ow">and</span> <span class="n">dag</span> |
| <span class="k">if</span> <span class="n">should_pass_filepath</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span> <span class="o">!=</span> <span class="n">dag</span><span class="o">.</span><span class="n">filepath</span><span class="p">:</span> |
| <span class="n">path</span> <span class="o">=</span> <span class="s2">"DAGS_FOLDER/</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dag</span><span class="o">.</span><span class="n">filepath</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">should_pass_filepath</span> <span class="ow">and</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span><span class="p">:</span> |
| <span class="n">path</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">full_filepath</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">path</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">return</span> <span class="n">TaskInstance</span><span class="o">.</span><span class="n">generate_command</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> |
| <span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span> |
| <span class="n">ignore_all_deps</span><span class="o">=</span><span class="n">ignore_all_deps</span><span class="p">,</span> |
| <span class="n">ignore_task_deps</span><span class="o">=</span><span class="n">ignore_task_deps</span><span class="p">,</span> |
| <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="n">ignore_depends_on_past</span><span class="p">,</span> |
| <span class="n">ignore_ti_state</span><span class="o">=</span><span class="n">ignore_ti_state</span><span class="p">,</span> |
| <span class="n">local</span><span class="o">=</span><span class="n">local</span><span class="p">,</span> |
| <span class="n">pickle_id</span><span class="o">=</span><span class="n">pickle_id</span><span class="p">,</span> |
| <span class="n">file_path</span><span class="o">=</span><span class="n">path</span><span class="p">,</span> |
| <span class="n">raw</span><span class="o">=</span><span class="n">raw</span><span class="p">,</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="n">job_id</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span> |
| <span class="n">cfg_path</span><span class="o">=</span><span class="n">cfg_path</span><span class="p">)</span></div> |
| |
| <span class="nd">@staticmethod</span> |
| <div class="viewcode-block" id="TaskInstance.generate_command"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.generate_command">[docs]</a> <span class="k">def</span> <span class="nf">generate_command</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">task_id</span><span class="p">,</span> |
| <span class="n">execution_date</span><span class="p">,</span> |
| <span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_all_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_task_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_ti_state</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">local</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">pickle_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">file_path</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">raw</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">cfg_path</span><span class="o">=</span><span class="kc">None</span> |
| <span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Generates the shell command required to execute this task instance.</span> |
| |
| <span class="sd"> :param dag_id: DAG ID</span> |
| <span class="sd"> :type dag_id: unicode</span> |
| <span class="sd"> :param task_id: Task ID</span> |
| <span class="sd"> :type task_id: unicode</span> |
| <span class="sd"> :param execution_date: Execution date for the task</span> |
| <span class="sd"> :type execution_date: datetime.datetime</span> |
| <span class="sd"> :param mark_success: Whether to mark the task as successful</span> |
| <span class="sd"> :type mark_success: bool</span> |
| <span class="sd"> :param ignore_all_deps: Ignore all ignorable dependencies.</span> |
| <span class="sd"> Overrides the other ignore_* parameters.</span> |
| <span class="sd"> :type ignore_all_deps: bool</span> |
| <span class="sd"> :param ignore_depends_on_past: Ignore depends_on_past parameter of DAGs</span> |
| <span class="sd"> (e.g. for Backfills)</span> |
| <span class="sd"> :type ignore_depends_on_past: bool</span> |
| <span class="sd"> :param ignore_task_deps: Ignore task-specific dependencies such as depends_on_past</span> |
| <span class="sd"> and trigger rule</span> |
| <span class="sd"> :type ignore_task_deps: bool</span> |
| <span class="sd"> :param ignore_ti_state: Ignore the task instance's previous failure/success</span> |
| <span class="sd"> :type ignore_ti_state: bool</span> |
| <span class="sd"> :param local: Whether to run the task locally</span> |
| <span class="sd"> :type local: bool</span> |
| <span class="sd"> :param pickle_id: If the DAG was serialized to the DB, the ID</span> |
| <span class="sd"> associated with the pickled DAG</span> |
| <span class="sd"> :type pickle_id: unicode</span> |
| <span class="sd"> :param file_path: path to the file containing the DAG definition</span> |
| <span class="sd"> :param raw: raw mode (needs more details)</span> |
| <span class="sd"> :param job_id: job ID (needs more details)</span> |
| <span class="sd"> :param pool: the Airflow pool that the task should run in</span> |
| <span class="sd"> :type pool: unicode</span> |
| <span class="sd"> :param cfg_path: the Path to the configuration file</span> |
| <span class="sd"> :type cfg_path: basestring</span> |
| <span class="sd"> :return: shell command that can be used to run the task instance</span> |
| <span class="sd"> """</span> |
| <span class="n">iso</span> <span class="o">=</span> <span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span> |
| <span class="n">cmd</span> <span class="o">=</span> <span class="p">[</span><span class="s2">"airflow"</span><span class="p">,</span> <span class="s2">"run"</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">dag_id</span><span class="p">),</span> <span class="nb">str</span><span class="p">(</span><span class="n">task_id</span><span class="p">),</span> <span class="nb">str</span><span class="p">(</span><span class="n">iso</span><span class="p">)]</span> |
| <span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">"--mark_success"</span><span class="p">])</span> <span class="k">if</span> <span class="n">mark_success</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">"--pickle"</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">pickle_id</span><span class="p">)])</span> <span class="k">if</span> <span class="n">pickle_id</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">"--job_id"</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">job_id</span><span class="p">)])</span> <span class="k">if</span> <span class="n">job_id</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">"-A"</span><span class="p">])</span> <span class="k">if</span> <span class="n">ignore_all_deps</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">"-i"</span><span class="p">])</span> <span class="k">if</span> <span class="n">ignore_task_deps</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">"-I"</span><span class="p">])</span> <span class="k">if</span> <span class="n">ignore_depends_on_past</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">"--force"</span><span class="p">])</span> <span class="k">if</span> <span class="n">ignore_ti_state</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">"--local"</span><span class="p">])</span> <span class="k">if</span> <span class="n">local</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">"--pool"</span><span class="p">,</span> <span class="n">pool</span><span class="p">])</span> <span class="k">if</span> <span class="n">pool</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">"--raw"</span><span class="p">])</span> <span class="k">if</span> <span class="n">raw</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">"-sd"</span><span class="p">,</span> <span class="n">file_path</span><span class="p">])</span> <span class="k">if</span> <span class="n">file_path</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="n">cmd</span><span class="o">.</span><span class="n">extend</span><span class="p">([</span><span class="s2">"--cfg_path"</span><span class="p">,</span> <span class="n">cfg_path</span><span class="p">])</span> <span class="k">if</span> <span class="n">cfg_path</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="k">return</span> <span class="n">cmd</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="TaskInstance.log_filepath"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.log_filepath">[docs]</a> <span class="k">def</span> <span class="nf">log_filepath</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">iso</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span> |
| <span class="n">log</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">path</span><span class="o">.</span><span class="n">expanduser</span><span class="p">(</span><span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'core'</span><span class="p">,</span> <span class="s1">'BASE_LOG_FOLDER'</span><span class="p">))</span> |
| <span class="k">return</span> <span class="p">(</span><span class="s2">"</span><span class="si">{log}</span><span class="s2">/</span><span class="si">{dag_id}</span><span class="s2">/</span><span class="si">{task_id}</span><span class="s2">/</span><span class="si">{iso}</span><span class="s2">.log"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">log</span><span class="o">=</span><span class="n">log</span><span class="p">,</span> <span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">iso</span><span class="o">=</span><span class="n">iso</span><span class="p">))</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="TaskInstance.log_url"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.log_url">[docs]</a> <span class="k">def</span> <span class="nf">log_url</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">iso</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span> |
| <span class="n">base_url</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'webserver'</span><span class="p">,</span> <span class="s1">'BASE_URL'</span><span class="p">)</span> |
| <span class="n">relative_url</span> <span class="o">=</span> <span class="s1">'/log?execution_date=</span><span class="si">{iso}</span><span class="s1">&task_id=</span><span class="si">{task_id}</span><span class="s1">&dag_id=</span><span class="si">{dag_id}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">iso</span><span class="o">=</span><span class="n">quote_plus</span><span class="p">(</span><span class="n">iso</span><span class="p">),</span> <span class="n">task_id</span><span class="o">=</span><span class="n">quote_plus</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">),</span> <span class="n">dag_id</span><span class="o">=</span><span class="n">quote_plus</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">))</span> |
| |
| <span class="k">if</span> <span class="n">conf</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">'webserver'</span><span class="p">,</span> <span class="s1">'rbac'</span><span class="p">):</span> |
| <span class="k">return</span> <span class="s1">'</span><span class="si">{base_url}{relative_url}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">base_url</span><span class="o">=</span><span class="n">base_url</span><span class="p">,</span> <span class="n">relative_url</span><span class="o">=</span><span class="n">relative_url</span><span class="p">)</span> |
| <span class="k">return</span> <span class="s1">'</span><span class="si">{base_url}</span><span class="s1">/admin/airflow</span><span class="si">{relative_url}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">base_url</span><span class="o">=</span><span class="n">base_url</span><span class="p">,</span> <span class="n">relative_url</span><span class="o">=</span><span class="n">relative_url</span><span class="p">)</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="TaskInstance.mark_success_url"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.mark_success_url">[docs]</a> <span class="k">def</span> <span class="nf">mark_success_url</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">iso</span> <span class="o">=</span> <span class="n">quote</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">())</span> |
| <span class="n">base_url</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'webserver'</span><span class="p">,</span> <span class="s1">'BASE_URL'</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">base_url</span> <span class="o">+</span> <span class="p">(</span> |
| <span class="s2">"/success"</span> |
| <span class="s2">"?task_id=</span><span class="si">{task_id}</span><span class="s2">"</span> |
| <span class="s2">"&dag_id=</span><span class="si">{dag_id}</span><span class="s2">"</span> |
| <span class="s2">"&execution_date=</span><span class="si">{iso}</span><span class="s2">"</span> |
| <span class="s2">"&upstream=false"</span> |
| <span class="s2">"&downstream=false"</span> |
| <span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">iso</span><span class="o">=</span><span class="n">iso</span><span class="p">)</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.current_state"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.current_state">[docs]</a> <span class="k">def</span> <span class="nf">current_state</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Get the very latest state from the database, if a session is passed,</span> |
| <span class="sd"> we use and looking up the state becomes part of the session, otherwise</span> |
| <span class="sd"> a new session is used.</span> |
| <span class="sd"> """</span> |
| <span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span> |
| <span class="n">ti</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span> |
| <span class="n">TI</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">TI</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> |
| <span class="n">TI</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> |
| <span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">ti</span><span class="p">:</span> |
| <span class="n">state</span> <span class="o">=</span> <span class="n">ti</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">state</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">state</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">return</span> <span class="n">state</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.error"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.error">[docs]</a> <span class="k">def</span> <span class="nf">error</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Forces the task instance's state to FAILED in the database.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">"Recording the task instance as FAILED"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">FAILED</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.refresh_from_db"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.refresh_from_db">[docs]</a> <span class="k">def</span> <span class="nf">refresh_from_db</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">lock_for_update</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">refresh_executor_config</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Refreshes the task instance from the database based on the primary key</span> |
| |
| <span class="sd"> :param refresh_executor_config: if True, revert executor config to</span> |
| <span class="sd"> result from DB. Often, however, we will want to keep the newest</span> |
| <span class="sd"> version</span> |
| <span class="sd"> :param lock_for_update: if True, indicates that the database should</span> |
| <span class="sd"> lock the TaskInstance (issuing a FOR UPDATE clause) until the</span> |
| <span class="sd"> session is committed.</span> |
| <span class="sd"> """</span> |
| <span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span> |
| |
| <span class="n">qry</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span> |
| <span class="n">TI</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">TI</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> |
| <span class="n">TI</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">lock_for_update</span><span class="p">:</span> |
| <span class="n">ti</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">with_for_update</span><span class="p">()</span><span class="o">.</span><span class="n">first</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">ti</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">first</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">ti</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">state</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">start_date</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">end_date</span> |
| <span class="c1"># Get the raw value of try_number column, don't read through the</span> |
| <span class="c1"># accessor here otherwise it will be incremeneted by one already.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">_try_number</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">max_tries</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">hostname</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">hostname</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pid</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">pid</span> |
| <span class="k">if</span> <span class="n">refresh_executor_config</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">executor_config</span> <span class="o">=</span> <span class="n">ti</span><span class="o">.</span><span class="n">executor_config</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="kc">None</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.clear_xcom_data"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.clear_xcom_data">[docs]</a> <span class="k">def</span> <span class="nf">clear_xcom_data</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Clears all XCom data from the database for the task instance</span> |
| <span class="sd"> """</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">XCom</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span> |
| <span class="n">XCom</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">XCom</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> |
| <span class="n">XCom</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span> |
| <span class="p">)</span><span class="o">.</span><span class="n">delete</span><span class="p">()</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="TaskInstance.key"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.key">[docs]</a> <span class="k">def</span> <span class="nf">key</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Returns a tuple that identifies the task instance uniquely</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.set_state"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.set_state">[docs]</a> <span class="k">def</span> <span class="nf">set_state</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">commit</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">state</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">commit</span><span class="p">:</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="TaskInstance.is_premature"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.is_premature">[docs]</a> <span class="k">def</span> <span class="nf">is_premature</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Returns whether a task is in UP_FOR_RETRY state and its retry interval</span> |
| <span class="sd"> has elapsed.</span> |
| <span class="sd"> """</span> |
| <span class="c1"># is the task still in the retry waiting period?</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">ready_for_retry</span><span class="p">()</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.are_dependents_done"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.are_dependents_done">[docs]</a> <span class="k">def</span> <span class="nf">are_dependents_done</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Checks whether the dependents of this task instance have all succeeded.</span> |
| <span class="sd"> This is meant to be used by wait_for_downstream.</span> |
| |
| <span class="sd"> This is useful when you do not want to start processing the next</span> |
| <span class="sd"> schedule of a task until the dependents are done. For instance,</span> |
| <span class="sd"> if the task DROPs and recreates a table.</span> |
| <span class="sd"> """</span> |
| <span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">task</span><span class="o">.</span><span class="n">downstream_task_ids</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">True</span> |
| |
| <span class="n">ti</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">func</span><span class="o">.</span><span class="n">count</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">task_id</span><span class="p">))</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span> |
| <span class="n">TaskInstance</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">TaskInstance</span><span class="o">.</span><span class="n">task_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">downstream_task_ids</span><span class="p">),</span> |
| <span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> |
| <span class="n">TaskInstance</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">,</span> |
| <span class="p">)</span> |
| <span class="n">count</span> <span class="o">=</span> <span class="n">ti</span><span class="p">[</span><span class="mi">0</span><span class="p">][</span><span class="mi">0</span><span class="p">]</span> |
| <span class="k">return</span> <span class="n">count</span> <span class="o">==</span> <span class="nb">len</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">downstream_task_ids</span><span class="p">)</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance._get_previous_ti"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance._get_previous_ti">[docs]</a> <span class="k">def</span> <span class="nf">_get_previous_ti</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="c1"># type: (Optional[str], Session) -> Optional['TaskInstance']</span> |
| <span class="n">dag</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span> |
| <span class="k">if</span> <span class="n">dag</span><span class="p">:</span> |
| <span class="n">dr</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_dagrun</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span> |
| |
| <span class="c1"># LEGACY: most likely running from unit tests</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">dr</span><span class="p">:</span> |
| <span class="c1"># Means that this TI is NOT being run from a DR, but from a catchup</span> |
| <span class="n">previous_scheduled_date</span> <span class="o">=</span> <span class="n">dag</span><span class="o">.</span><span class="n">previous_schedule</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">previous_scheduled_date</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">None</span> |
| |
| <span class="k">return</span> <span class="n">TaskInstance</span><span class="p">(</span><span class="n">task</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="p">,</span> <span class="n">execution_date</span><span class="o">=</span><span class="n">previous_scheduled_date</span><span class="p">)</span> |
| |
| <span class="n">dr</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">dag</span> |
| |
| <span class="c1"># We always ignore schedule in dagrun lookup when `state` is given or `schedule_interval is None`.</span> |
| <span class="c1"># For legacy reasons, when `catchup=True`, we use `get_previous_scheduled_dagrun` unless</span> |
| <span class="c1"># `ignore_schedule` is `True`.</span> |
| <span class="n">ignore_schedule</span> <span class="o">=</span> <span class="n">state</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="ow">or</span> <span class="n">dag</span><span class="o">.</span><span class="n">schedule_interval</span> <span class="ow">is</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="n">dag</span><span class="o">.</span><span class="n">catchup</span> <span class="ow">is</span> <span class="kc">True</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">ignore_schedule</span><span class="p">:</span> |
| <span class="n">last_dagrun</span> <span class="o">=</span> <span class="n">dr</span><span class="o">.</span><span class="n">get_previous_scheduled_dagrun</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">last_dagrun</span> <span class="o">=</span> <span class="n">dr</span><span class="o">.</span><span class="n">get_previous_dagrun</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">,</span> <span class="n">state</span><span class="o">=</span><span class="n">state</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">last_dagrun</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">last_dagrun</span><span class="o">.</span><span class="n">get_task_instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="kc">None</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="TaskInstance.previous_ti"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.previous_ti">[docs]</a> <span class="k">def</span> <span class="nf">previous_ti</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="c1"># type: () -> Optional['TaskInstance']</span> |
| <span class="sd">"""The task instance for the task that ran before this task instance."""</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_previous_ti</span><span class="p">()</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="TaskInstance.previous_ti_success"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.previous_ti_success">[docs]</a> <span class="k">def</span> <span class="nf">previous_ti_success</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="c1"># type: () -> Optional['TaskInstance']</span> |
| <span class="sd">"""The ti from prior succesful dag run for this task, by execution date."""</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_previous_ti</span><span class="p">(</span><span class="n">state</span><span class="o">=</span><span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">)</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="TaskInstance.previous_execution_date_success"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.previous_execution_date_success">[docs]</a> <span class="k">def</span> <span class="nf">previous_execution_date_success</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="c1"># type: () -> Optional[pendulum.datetime]</span> |
| <span class="sd">"""The execution date from property previous_ti_success."""</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"previous_execution_date_success was called"</span><span class="p">)</span> |
| <span class="n">prev_ti</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_previous_ti</span><span class="p">(</span><span class="n">state</span><span class="o">=</span><span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">prev_ti</span> <span class="ow">and</span> <span class="n">prev_ti</span><span class="o">.</span><span class="n">execution_date</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="TaskInstance.previous_start_date_success"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.previous_start_date_success">[docs]</a> <span class="k">def</span> <span class="nf">previous_start_date_success</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> <span class="c1"># type: () -> Optional[pendulum.datetime]</span> |
| <span class="sd">"""The start date from property previous_ti_success."""</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"previous_start_date_success was called"</span><span class="p">)</span> |
| <span class="n">prev_ti</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_previous_ti</span><span class="p">(</span><span class="n">state</span><span class="o">=</span><span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">prev_ti</span> <span class="ow">and</span> <span class="n">prev_ti</span><span class="o">.</span><span class="n">start_date</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.are_dependencies_met"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.are_dependencies_met">[docs]</a> <span class="k">def</span> <span class="nf">are_dependencies_met</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">dep_context</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">verbose</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Returns whether or not all the conditions are met for this task instance to be run</span> |
| <span class="sd"> given the context for the dependencies (e.g. a task instance being force run from</span> |
| <span class="sd"> the UI will ignore some dependencies).</span> |
| |
| <span class="sd"> :param dep_context: The execution context that determines the dependencies that</span> |
| <span class="sd"> should be evaluated.</span> |
| <span class="sd"> :type dep_context: DepContext</span> |
| <span class="sd"> :param session: database session</span> |
| <span class="sd"> :type session: sqlalchemy.orm.session.Session</span> |
| <span class="sd"> :param verbose: whether log details on failed dependencies on</span> |
| <span class="sd"> info or debug log level</span> |
| <span class="sd"> :type verbose: bool</span> |
| <span class="sd"> """</span> |
| <span class="n">dep_context</span> <span class="o">=</span> <span class="n">dep_context</span> <span class="ow">or</span> <span class="n">DepContext</span><span class="p">()</span> |
| <span class="n">failed</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="n">verbose_aware_logger</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span> <span class="k">if</span> <span class="n">verbose</span> <span class="k">else</span> <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span> |
| <span class="k">for</span> <span class="n">dep_status</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_failed_dep_statuses</span><span class="p">(</span> |
| <span class="n">dep_context</span><span class="o">=</span><span class="n">dep_context</span><span class="p">,</span> |
| <span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">):</span> |
| <span class="n">failed</span> <span class="o">=</span> <span class="kc">True</span> |
| |
| <span class="n">verbose_aware_logger</span><span class="p">(</span> |
| <span class="s2">"Dependencies not met for </span><span class="si">%s</span><span class="s2">, dependency '</span><span class="si">%s</span><span class="s2">' FAILED: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">dep_status</span><span class="o">.</span><span class="n">dep_name</span><span class="p">,</span> <span class="n">dep_status</span><span class="o">.</span><span class="n">reason</span> |
| <span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">failed</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">False</span> |
| |
| <span class="n">verbose_aware_logger</span><span class="p">(</span><span class="s2">"Dependencies all met for </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span> |
| <span class="k">return</span> <span class="kc">True</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.get_failed_dep_statuses"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.get_failed_dep_statuses">[docs]</a> <span class="k">def</span> <span class="nf">get_failed_dep_statuses</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">dep_context</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="n">dep_context</span> <span class="o">=</span> <span class="n">dep_context</span> <span class="ow">or</span> <span class="n">DepContext</span><span class="p">()</span> |
| <span class="k">for</span> <span class="n">dep</span> <span class="ow">in</span> <span class="n">dep_context</span><span class="o">.</span><span class="n">deps</span> <span class="o">|</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">deps</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">dep_status</span> <span class="ow">in</span> <span class="n">dep</span><span class="o">.</span><span class="n">get_dep_statuses</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">session</span><span class="p">,</span> |
| <span class="n">dep_context</span><span class="p">):</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span> |
| <span class="s2">"</span><span class="si">%s</span><span class="s2"> dependency '</span><span class="si">%s</span><span class="s2">' PASSED: </span><span class="si">%s</span><span class="s2">, </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> |
| <span class="bp">self</span><span class="p">,</span> <span class="n">dep_status</span><span class="o">.</span><span class="n">dep_name</span><span class="p">,</span> <span class="n">dep_status</span><span class="o">.</span><span class="n">passed</span><span class="p">,</span> <span class="n">dep_status</span><span class="o">.</span><span class="n">reason</span> |
| <span class="p">)</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">dep_status</span><span class="o">.</span><span class="n">passed</span><span class="p">:</span> |
| <span class="k">yield</span> <span class="n">dep_status</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.__repr__"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.__repr__">[docs]</a> <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">(</span> |
| <span class="s2">"<TaskInstance: </span><span class="si">{ti.dag_id}</span><span class="s2">.</span><span class="si">{ti.task_id}</span><span class="s2"> "</span> |
| <span class="s2">"</span><span class="si">{ti.execution_date}</span><span class="s2"> [</span><span class="si">{ti.state}</span><span class="s2">]>"</span> |
| <span class="p">)</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">ti</span><span class="o">=</span><span class="bp">self</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.next_retry_datetime"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.next_retry_datetime">[docs]</a> <span class="k">def</span> <span class="nf">next_retry_datetime</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Get datetime of the next retry if the task instance fails. For exponential</span> |
| <span class="sd"> backoff, retry_delay is used as base and will be converted to seconds.</span> |
| <span class="sd"> """</span> |
| <span class="n">delay</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retry_delay</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retry_exponential_backoff</span><span class="p">:</span> |
| <span class="c1"># If the min_backoff calculation is below 1, it will be converted to 0 via int. Thus,</span> |
| <span class="c1"># we must round up prior to converting to an int, otherwise a divide by zero error</span> |
| <span class="c1"># will occurr in the modded_hash calculation.</span> |
| <span class="n">min_backoff</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">math</span><span class="o">.</span><span class="n">ceil</span><span class="p">(</span><span class="n">delay</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span> <span class="o">*</span> <span class="p">(</span><span class="mi">2</span> <span class="o">**</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">-</span> <span class="mi">2</span><span class="p">))))</span> |
| <span class="c1"># deterministic per task instance</span> |
| <span class="nb">hash</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">hashlib</span><span class="o">.</span><span class="n">sha1</span><span class="p">(</span><span class="s2">"</span><span class="si">{}</span><span class="s2">#</span><span class="si">{}</span><span class="s2">#</span><span class="si">{}</span><span class="s2">#</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span><span class="p">)</span> |
| <span class="o">.</span><span class="n">encode</span><span class="p">(</span><span class="s1">'utf-8'</span><span class="p">))</span><span class="o">.</span><span class="n">hexdigest</span><span class="p">(),</span> <span class="mi">16</span><span class="p">)</span> |
| <span class="c1"># between 1 and 1.0 * delay * (2^retry_number)</span> |
| <span class="n">modded_hash</span> <span class="o">=</span> <span class="n">min_backoff</span> <span class="o">+</span> <span class="nb">hash</span> <span class="o">%</span> <span class="n">min_backoff</span> |
| <span class="c1"># timedelta has a maximum representable value. The exponentiation</span> |
| <span class="c1"># here means this value can be exceeded after a certain number</span> |
| <span class="c1"># of tries (around 50 if the initial delay is 1s, even fewer if</span> |
| <span class="c1"># the delay is larger). Cap the value here before creating a</span> |
| <span class="c1"># timedelta object so the operation doesn't fail.</span> |
| <span class="n">delay_backoff_in_seconds</span> <span class="o">=</span> <span class="nb">min</span><span class="p">(</span> |
| <span class="n">modded_hash</span><span class="p">,</span> |
| <span class="n">timedelta</span><span class="o">.</span><span class="n">max</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span> <span class="o">-</span> <span class="mi">1</span> |
| <span class="p">)</span> |
| <span class="n">delay</span> <span class="o">=</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">delay_backoff_in_seconds</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">max_retry_delay</span><span class="p">:</span> |
| <span class="n">delay</span> <span class="o">=</span> <span class="nb">min</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">max_retry_delay</span><span class="p">,</span> <span class="n">delay</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">+</span> <span class="n">delay</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.ready_for_retry"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.ready_for_retry">[docs]</a> <span class="k">def</span> <span class="nf">ready_for_retry</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Checks on whether the task instance is in the right state and timeframe</span> |
| <span class="sd"> to be retried.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> <span class="ow">and</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">next_retry_datetime</span><span class="p">()</span> <span class="o"><</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">())</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.pool_full"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.pool_full">[docs]</a> <span class="k">def</span> <span class="nf">pool_full</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Returns a boolean as to whether the slot pool has room for this</span> |
| <span class="sd"> task to run</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">pool</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">False</span> |
| |
| <span class="n">pool</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">session</span> |
| <span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">Pool</span><span class="p">)</span> |
| <span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">Pool</span><span class="o">.</span><span class="n">pool</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">pool</span><span class="p">)</span> |
| <span class="o">.</span><span class="n">first</span><span class="p">()</span> |
| <span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">pool</span><span class="p">:</span> |
| <span class="k">return</span> <span class="kc">False</span> |
| <span class="n">open_slots</span> <span class="o">=</span> <span class="n">pool</span><span class="o">.</span><span class="n">open_slots</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">open_slots</span> <span class="o"><=</span> <span class="mi">0</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.get_dagrun"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.get_dagrun">[docs]</a> <span class="k">def</span> <span class="nf">get_dagrun</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Returns the DagRun for this TaskInstance</span> |
| |
| <span class="sd"> :param session:</span> |
| <span class="sd"> :return: DagRun</span> |
| <span class="sd"> """</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.dagrun</span> <span class="k">import</span> <span class="n">DagRun</span> <span class="c1"># Avoid circular import</span> |
| <span class="n">dr</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagRun</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span> |
| <span class="n">DagRun</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">DagRun</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span> |
| <span class="p">)</span><span class="o">.</span><span class="n">first</span><span class="p">()</span> |
| |
| <span class="k">return</span> <span class="n">dr</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance._check_and_change_state_before_execution"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance._check_and_change_state_before_execution">[docs]</a> <span class="k">def</span> <span class="nf">_check_and_change_state_before_execution</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">verbose</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">ignore_all_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_task_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_ti_state</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">test_mode</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Checks dependencies and then sets state to RUNNING if they are met. Returns</span> |
| <span class="sd"> True if and only if state is set to RUNNING, which implies that task should be</span> |
| <span class="sd"> executed, in preparation for _run_raw_task</span> |
| |
| <span class="sd"> :param verbose: whether to turn on more verbose logging</span> |
| <span class="sd"> :type verbose: bool</span> |
| <span class="sd"> :param ignore_all_deps: Ignore all of the non-critical dependencies, just runs</span> |
| <span class="sd"> :type ignore_all_deps: bool</span> |
| <span class="sd"> :param ignore_depends_on_past: Ignore depends_on_past DAG attribute</span> |
| <span class="sd"> :type ignore_depends_on_past: bool</span> |
| <span class="sd"> :param ignore_task_deps: Don't check the dependencies of this TI's task</span> |
| <span class="sd"> :type ignore_task_deps: bool</span> |
| <span class="sd"> :param ignore_ti_state: Disregards previous task instance state</span> |
| <span class="sd"> :type ignore_ti_state: bool</span> |
| <span class="sd"> :param mark_success: Don't run the task, mark its state as success</span> |
| <span class="sd"> :type mark_success: bool</span> |
| <span class="sd"> :param test_mode: Doesn't record success or failure in the DB</span> |
| <span class="sd"> :type test_mode: bool</span> |
| <span class="sd"> :param pool: specifies the pool to use to run the task instance</span> |
| <span class="sd"> :type pool: str</span> |
| <span class="sd"> :return: whether the state was changed to running or not</span> |
| <span class="sd"> :rtype: bool</span> |
| <span class="sd"> """</span> |
| <span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pool</span> <span class="o">=</span> <span class="n">pool</span> <span class="ow">or</span> <span class="n">task</span><span class="o">.</span><span class="n">pool</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">test_mode</span> <span class="o">=</span> <span class="n">test_mode</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">,</span> <span class="n">lock_for_update</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">job_id</span> <span class="o">=</span> <span class="n">job_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">hostname</span> <span class="o">=</span> <span class="n">get_hostname</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">operator</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">ignore_all_deps</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">ignore_ti_state</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">:</span> |
| <span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">'previously_succeeded'</span><span class="p">,</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span> |
| |
| <span class="c1"># TODO: Logging needs cleanup, not clear what is being printed</span> |
| <span class="n">hr</span> <span class="o">=</span> <span class="s2">"</span><span class="se">\n</span><span class="s2">"</span> <span class="o">+</span> <span class="p">(</span><span class="s2">"-"</span> <span class="o">*</span> <span class="mi">80</span><span class="p">)</span> <span class="c1"># Line break</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">mark_success</span><span class="p">:</span> |
| <span class="c1"># Firstly find non-runnable and non-requeueable tis.</span> |
| <span class="c1"># Since mark_success is not set, we do nothing.</span> |
| <span class="n">non_requeueable_dep_context</span> <span class="o">=</span> <span class="n">DepContext</span><span class="p">(</span> |
| <span class="n">deps</span><span class="o">=</span><span class="n">RUNNING_DEPS</span> <span class="o">-</span> <span class="n">REQUEUEABLE_DEPS</span><span class="p">,</span> |
| <span class="n">ignore_all_deps</span><span class="o">=</span><span class="n">ignore_all_deps</span><span class="p">,</span> |
| <span class="n">ignore_ti_state</span><span class="o">=</span><span class="n">ignore_ti_state</span><span class="p">,</span> |
| <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="n">ignore_depends_on_past</span><span class="p">,</span> |
| <span class="n">ignore_task_deps</span><span class="o">=</span><span class="n">ignore_task_deps</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">are_dependencies_met</span><span class="p">(</span> |
| <span class="n">dep_context</span><span class="o">=</span><span class="n">non_requeueable_dep_context</span><span class="p">,</span> |
| <span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">,</span> |
| <span class="n">verbose</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> |
| <span class="k">return</span> <span class="kc">False</span> |
| |
| <span class="c1"># For reporting purposes, we report based on 1-indexed,</span> |
| <span class="c1"># not 0-indexed lists (i.e. Attempt 1 instead of</span> |
| <span class="c1"># Attempt 0 for the first attempt).</span> |
| <span class="c1"># Set the task start date. In case it was re-scheduled use the initial</span> |
| <span class="c1"># start date that is recorded in task_reschedule table</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> |
| <span class="n">task_reschedules</span> <span class="o">=</span> <span class="n">TaskReschedule</span><span class="o">.</span><span class="n">find_for_task_instance</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">task_reschedules</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">task_reschedules</span><span class="p">[</span><span class="mi">0</span><span class="p">]</span><span class="o">.</span><span class="n">start_date</span> |
| |
| <span class="c1"># Secondly we find non-runnable but requeueable tis. We reset its state.</span> |
| <span class="c1"># This is because we might have hit concurrency limits,</span> |
| <span class="c1"># e.g. because of backfilling.</span> |
| <span class="n">dep_context</span> <span class="o">=</span> <span class="n">DepContext</span><span class="p">(</span> |
| <span class="n">deps</span><span class="o">=</span><span class="n">REQUEUEABLE_DEPS</span><span class="p">,</span> |
| <span class="n">ignore_all_deps</span><span class="o">=</span><span class="n">ignore_all_deps</span><span class="p">,</span> |
| <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="n">ignore_depends_on_past</span><span class="p">,</span> |
| <span class="n">ignore_task_deps</span><span class="o">=</span><span class="n">ignore_task_deps</span><span class="p">,</span> |
| <span class="n">ignore_ti_state</span><span class="o">=</span><span class="n">ignore_ti_state</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">are_dependencies_met</span><span class="p">(</span> |
| <span class="n">dep_context</span><span class="o">=</span><span class="n">dep_context</span><span class="p">,</span> |
| <span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">,</span> |
| <span class="n">verbose</span><span class="o">=</span><span class="kc">True</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">NONE</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="n">hr</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span> |
| <span class="s2">"Rescheduling due to concurrency limits reached "</span> |
| <span class="s2">"at task runtime. Attempt </span><span class="si">%s</span><span class="s2"> of "</span> |
| <span class="s2">"</span><span class="si">%s</span><span class="s2">. State set to NONE."</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">+</span> <span class="mi">1</span> |
| <span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="n">hr</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">queued_dttm</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> |
| <span class="k">return</span> <span class="kc">False</span> |
| |
| <span class="c1"># print status message</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="n">hr</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Starting attempt </span><span class="si">%s</span><span class="s2"> of </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span> <span class="o">+</span> <span class="mi">1</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="n">hr</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">+=</span> <span class="mi">1</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">test_mode</span><span class="p">:</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">Log</span><span class="p">(</span><span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">,</span> <span class="bp">self</span><span class="p">))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pid</span> <span class="o">=</span> <span class="n">os</span><span class="o">.</span><span class="n">getpid</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">test_mode</span><span class="p">:</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> |
| |
| <span class="c1"># Closing all pooled connections to prevent</span> |
| <span class="c1"># "max number of connections reached"</span> |
| <span class="n">settings</span><span class="o">.</span><span class="n">engine</span><span class="o">.</span><span class="n">dispose</span><span class="p">()</span> |
| <span class="k">if</span> <span class="n">verbose</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">mark_success</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Marking success for </span><span class="si">%s</span><span class="s2"> on </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s2">"Executing </span><span class="si">%s</span><span class="s2"> on </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span> |
| <span class="k">return</span> <span class="kc">True</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <span class="nd">@Sentry</span><span class="o">.</span><span class="n">enrich_errors</span> |
| <div class="viewcode-block" id="TaskInstance._run_raw_task"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance._run_raw_task">[docs]</a> <span class="k">def</span> <span class="nf">_run_raw_task</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">test_mode</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Immediately runs the task (without checking or changing db state</span> |
| <span class="sd"> before execution) and then sets the appropriate final state after</span> |
| <span class="sd"> completion and runs any post-execute callbacks. Meant to be called</span> |
| <span class="sd"> only after another function changes the state to running.</span> |
| |
| <span class="sd"> :param mark_success: Don't run the task, mark its state as success</span> |
| <span class="sd"> :type mark_success: bool</span> |
| <span class="sd"> :param test_mode: Doesn't record success or failure in the DB</span> |
| <span class="sd"> :type test_mode: bool</span> |
| <span class="sd"> :param pool: specifies the pool to use to run the task instance</span> |
| <span class="sd"> :type pool: str</span> |
| <span class="sd"> """</span> |
| <span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pool</span> <span class="o">=</span> <span class="n">pool</span> <span class="ow">or</span> <span class="n">task</span><span class="o">.</span><span class="n">pool</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">test_mode</span> <span class="o">=</span> <span class="n">test_mode</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">(</span><span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">job_id</span> <span class="o">=</span> <span class="n">job_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">hostname</span> <span class="o">=</span> <span class="n">get_hostname</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">operator</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span> |
| |
| <span class="n">context</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">actual_start_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">mark_success</span><span class="p">:</span> |
| <span class="n">context</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_template_context</span><span class="p">()</span> |
| |
| <span class="n">task_copy</span> <span class="o">=</span> <span class="n">copy</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">task</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task</span> <span class="o">=</span> <span class="n">task_copy</span> |
| |
| <span class="k">def</span> <span class="nf">signal_handler</span><span class="p">(</span><span class="n">signum</span><span class="p">,</span> <span class="n">frame</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">"Received SIGTERM. Terminating subprocesses."</span><span class="p">)</span> |
| <span class="n">task_copy</span><span class="o">.</span><span class="n">on_kill</span><span class="p">()</span> |
| <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span><span class="s2">"Task received SIGTERM signal"</span><span class="p">)</span> |
| <span class="n">signal</span><span class="o">.</span><span class="n">signal</span><span class="p">(</span><span class="n">signal</span><span class="o">.</span><span class="n">SIGTERM</span><span class="p">,</span> <span class="n">signal_handler</span><span class="p">)</span> |
| |
| <span class="c1"># Don't clear Xcom until the task is certain to execute</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">clear_xcom_data</span><span class="p">()</span> |
| |
| <span class="n">start_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">render_templates</span><span class="p">(</span><span class="n">context</span><span class="o">=</span><span class="n">context</span><span class="p">)</span> |
| <span class="n">task_copy</span><span class="o">.</span><span class="n">pre_execute</span><span class="p">(</span><span class="n">context</span><span class="o">=</span><span class="n">context</span><span class="p">)</span> |
| |
| <span class="c1"># If a timeout is specified for the task, make it fail</span> |
| <span class="c1"># if it goes beyond</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="n">task_copy</span><span class="o">.</span><span class="n">execution_timeout</span><span class="p">:</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">with</span> <span class="n">timeout</span><span class="p">(</span><span class="nb">int</span><span class="p">(</span> |
| <span class="n">task_copy</span><span class="o">.</span><span class="n">execution_timeout</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">())):</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="n">task_copy</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="o">=</span><span class="n">context</span><span class="p">)</span> |
| <span class="k">except</span> <span class="n">AirflowTaskTimeout</span><span class="p">:</span> |
| <span class="n">task_copy</span><span class="o">.</span><span class="n">on_kill</span><span class="p">()</span> |
| <span class="k">raise</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="n">task_copy</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="n">context</span><span class="o">=</span><span class="n">context</span><span class="p">)</span> |
| |
| <span class="c1"># If the task returns a result, push an XCom containing it</span> |
| <span class="k">if</span> <span class="n">task_copy</span><span class="o">.</span><span class="n">do_xcom_push</span> <span class="ow">and</span> <span class="n">result</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">(</span><span class="n">key</span><span class="o">=</span><span class="n">XCOM_RETURN_KEY</span><span class="p">,</span> <span class="n">value</span><span class="o">=</span><span class="n">result</span><span class="p">)</span> |
| |
| <span class="n">task_copy</span><span class="o">.</span><span class="n">post_execute</span><span class="p">(</span><span class="n">context</span><span class="o">=</span><span class="n">context</span><span class="p">,</span> <span class="n">result</span><span class="o">=</span><span class="n">result</span><span class="p">)</span> |
| |
| <span class="n">end_time</span> <span class="o">=</span> <span class="n">time</span><span class="o">.</span><span class="n">time</span><span class="p">()</span> |
| <span class="n">duration</span> <span class="o">=</span> <span class="n">end_time</span> <span class="o">-</span> <span class="n">start_time</span> |
| <span class="n">Stats</span><span class="o">.</span><span class="n">timing</span><span class="p">(</span> |
| <span class="s1">'dag.</span><span class="si">{dag_id}</span><span class="s1">.</span><span class="si">{task_id}</span><span class="s1">.duration'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">dag_id</span><span class="o">=</span><span class="n">task_copy</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="n">task_copy</span><span class="o">.</span><span class="n">task_id</span><span class="p">),</span> |
| <span class="n">duration</span><span class="p">)</span> |
| |
| <span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">'operator_successes_</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">),</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span> |
| <span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">'ti_successes'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">(</span><span class="n">lock_for_update</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span> |
| <span class="k">except</span> <span class="n">AirflowSkipException</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="c1"># log only if exception has any arguments to prevent log flooding</span> |
| <span class="k">if</span> <span class="n">e</span><span class="o">.</span><span class="n">args</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">(</span><span class="n">lock_for_update</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">SKIPPED</span> |
| <span class="k">except</span> <span class="n">AirflowRescheduleException</span> <span class="k">as</span> <span class="n">reschedule_exception</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_handle_reschedule</span><span class="p">(</span><span class="n">actual_start_date</span><span class="p">,</span> <span class="n">reschedule_exception</span><span class="p">,</span> <span class="n">test_mode</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span> |
| <span class="k">return</span> |
| <span class="k">except</span> <span class="n">AirflowException</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">refresh_from_db</span><span class="p">()</span> |
| <span class="c1"># for case when task is marked as success/failed externally</span> |
| <span class="c1"># current behavior doesn't hit the success callback</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="ow">in</span> <span class="p">{</span><span class="n">State</span><span class="o">.</span><span class="n">SUCCESS</span><span class="p">,</span> <span class="n">State</span><span class="o">.</span><span class="n">FAILED</span><span class="p">}:</span> |
| <span class="k">return</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">handle_failure</span><span class="p">(</span><span class="n">e</span><span class="p">,</span> <span class="n">test_mode</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span> |
| <span class="k">raise</span> |
| <span class="k">except</span> <span class="p">(</span><span class="ne">Exception</span><span class="p">,</span> <span class="ne">KeyboardInterrupt</span><span class="p">)</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">handle_failure</span><span class="p">(</span><span class="n">e</span><span class="p">,</span> <span class="n">test_mode</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span> |
| <span class="k">raise</span> |
| |
| <span class="c1"># Success callback</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">on_success_callback</span><span class="p">:</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">on_success_callback</span><span class="p">(</span><span class="n">context</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e3</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">"Failed when executing success callback"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e3</span><span class="p">)</span> |
| |
| <span class="c1"># Recording SUCCESS</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">set_duration</span><span class="p">()</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">test_mode</span><span class="p">:</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">Log</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">state</span><span class="p">,</span> <span class="bp">self</span><span class="p">))</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.run"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.run">[docs]</a> <span class="k">def</span> <span class="nf">run</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">verbose</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> |
| <span class="n">ignore_all_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_task_deps</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_ti_state</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">test_mode</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="n">res</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_check_and_change_state_before_execution</span><span class="p">(</span> |
| <span class="n">verbose</span><span class="o">=</span><span class="n">verbose</span><span class="p">,</span> |
| <span class="n">ignore_all_deps</span><span class="o">=</span><span class="n">ignore_all_deps</span><span class="p">,</span> |
| <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="n">ignore_depends_on_past</span><span class="p">,</span> |
| <span class="n">ignore_task_deps</span><span class="o">=</span><span class="n">ignore_task_deps</span><span class="p">,</span> |
| <span class="n">ignore_ti_state</span><span class="o">=</span><span class="n">ignore_ti_state</span><span class="p">,</span> |
| <span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span> |
| <span class="n">test_mode</span><span class="o">=</span><span class="n">test_mode</span><span class="p">,</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="n">job_id</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span> |
| <span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">res</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_run_raw_task</span><span class="p">(</span> |
| <span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span> |
| <span class="n">test_mode</span><span class="o">=</span><span class="n">test_mode</span><span class="p">,</span> |
| <span class="n">job_id</span><span class="o">=</span><span class="n">job_id</span><span class="p">,</span> |
| <span class="n">pool</span><span class="o">=</span><span class="n">pool</span><span class="p">,</span> |
| <span class="n">session</span><span class="o">=</span><span class="n">session</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.dry_run"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.dry_run">[docs]</a> <span class="k">def</span> <span class="nf">dry_run</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span> |
| <span class="n">task_copy</span> <span class="o">=</span> <span class="n">copy</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">task</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task</span> <span class="o">=</span> <span class="n">task_copy</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">render_templates</span><span class="p">()</span> |
| <span class="n">task_copy</span><span class="o">.</span><span class="n">dry_run</span><span class="p">()</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance._handle_reschedule"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance._handle_reschedule">[docs]</a> <span class="k">def</span> <span class="nf">_handle_reschedule</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">actual_start_date</span><span class="p">,</span> <span class="n">reschedule_exception</span><span class="p">,</span> <span class="n">test_mode</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">context</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="c1"># Don't record reschedule request in test mode</span> |
| <span class="k">if</span> <span class="n">test_mode</span><span class="p">:</span> |
| <span class="k">return</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">set_duration</span><span class="p">()</span> |
| |
| <span class="c1"># Log reschedule request</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">TaskReschedule</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span><span class="p">,</span> |
| <span class="n">actual_start_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span><span class="p">,</span> |
| <span class="n">reschedule_exception</span><span class="o">.</span><span class="n">reschedule_date</span><span class="p">))</span> |
| |
| <span class="c1"># set state</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RESCHEDULE</span> |
| |
| <span class="c1"># Decrement try_number so subsequent runs will use the same try number and write</span> |
| <span class="c1"># to same log file.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_try_number</span> <span class="o">-=</span> <span class="mi">1</span> |
| |
| <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Rescheduling task, marking task as UP_FOR_RESCHEDULE'</span><span class="p">)</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.handle_failure"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.handle_failure">[docs]</a> <span class="k">def</span> <span class="nf">handle_failure</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">error</span><span class="p">,</span> <span class="n">test_mode</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">context</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">error</span><span class="p">)</span> |
| <span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">set_duration</span><span class="p">()</span> |
| <span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">'operator_failures_</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">),</span> <span class="mi">1</span><span class="p">,</span> <span class="mi">1</span><span class="p">)</span> |
| <span class="n">Stats</span><span class="o">.</span><span class="n">incr</span><span class="p">(</span><span class="s1">'ti_failures'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">test_mode</span><span class="p">:</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">Log</span><span class="p">(</span><span class="n">State</span><span class="o">.</span><span class="n">FAILED</span><span class="p">,</span> <span class="bp">self</span><span class="p">))</span> |
| |
| <span class="c1"># Log failure duration</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">TaskFail</span><span class="p">(</span><span class="n">task</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span><span class="p">))</span> |
| |
| <span class="k">if</span> <span class="n">context</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">context</span><span class="p">[</span><span class="s1">'exception'</span><span class="p">]</span> <span class="o">=</span> <span class="n">error</span> |
| |
| <span class="c1"># Let's go deeper</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="c1"># Since this function is called only when the TI state is running,</span> |
| <span class="c1"># try_number contains the current try_number (not the next). We</span> |
| <span class="c1"># only mark task instance as FAILED if the next task instance</span> |
| <span class="c1"># try_number exceeds the max_tries.</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">is_eligible_to_retry</span><span class="p">():</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Marking task as UP_FOR_RETRY'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">email_on_retry</span> <span class="ow">and</span> <span class="n">task</span><span class="o">.</span><span class="n">email</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">email_alert</span><span class="p">(</span><span class="n">error</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">State</span><span class="o">.</span><span class="n">FAILED</span> |
| <span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">retries</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'All retries failed; marking task as FAILED'</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Marking task as FAILED.'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">email_on_failure</span> <span class="ow">and</span> <span class="n">task</span><span class="o">.</span><span class="n">email</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">email_alert</span><span class="p">(</span><span class="n">error</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e2</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s1">'Failed to send email to: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">email</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e2</span><span class="p">)</span> |
| |
| <span class="c1"># Handling callbacks pessimistically</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">UP_FOR_RETRY</span> <span class="ow">and</span> <span class="n">task</span><span class="o">.</span><span class="n">on_retry_callback</span><span class="p">:</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">on_retry_callback</span><span class="p">(</span><span class="n">context</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">FAILED</span> <span class="ow">and</span> <span class="n">task</span><span class="o">.</span><span class="n">on_failure_callback</span><span class="p">:</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">on_failure_callback</span><span class="p">(</span><span class="n">context</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e3</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span><span class="s2">"Failed at executing callback"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e3</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">test_mode</span><span class="p">:</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">merge</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.is_eligible_to_retry"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.is_eligible_to_retry">[docs]</a> <span class="k">def</span> <span class="nf">is_eligible_to_retry</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""Is task instance is eligible for retry"""</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">retries</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o"><=</span> <span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.get_template_context"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.get_template_context">[docs]</a> <span class="k">def</span> <span class="nf">get_template_context</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span> |
| <span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">macros</span> |
| <span class="n">tables</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="s1">'tables'</span> <span class="ow">in</span> <span class="n">task</span><span class="o">.</span><span class="n">params</span><span class="p">:</span> |
| <span class="n">tables</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">params</span><span class="p">[</span><span class="s1">'tables'</span><span class="p">]</span> |
| |
| <span class="n">params</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="n">run_id</span> <span class="o">=</span> <span class="s1">''</span> |
| <span class="n">dag_run</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="n">task</span><span class="p">,</span> <span class="s1">'dag'</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">params</span><span class="p">:</span> |
| <span class="n">params</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">params</span><span class="p">)</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.dagrun</span> <span class="k">import</span> <span class="n">DagRun</span> <span class="c1"># Avoid circular import</span> |
| <span class="n">dag_run</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">DagRun</span><span class="p">)</span> |
| <span class="o">.</span><span class="n">filter_by</span><span class="p">(</span> |
| <span class="n">dag_id</span><span class="o">=</span><span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">execution_date</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span> |
| <span class="o">.</span><span class="n">first</span><span class="p">()</span> |
| <span class="p">)</span> |
| <span class="n">run_id</span> <span class="o">=</span> <span class="n">dag_run</span><span class="o">.</span><span class="n">run_id</span> <span class="k">if</span> <span class="n">dag_run</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">expunge_all</span><span class="p">()</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> |
| |
| <span class="n">ds</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">'%Y-%m-</span><span class="si">%d</span><span class="s1">'</span><span class="p">)</span> |
| <span class="n">ts</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span> |
| <span class="n">yesterday_ds</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">-</span> <span class="n">timedelta</span><span class="p">(</span><span class="mi">1</span><span class="p">))</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">'%Y-%m-</span><span class="si">%d</span><span class="s1">'</span><span class="p">)</span> |
| <span class="n">tomorrow_ds</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">+</span> <span class="n">timedelta</span><span class="p">(</span><span class="mi">1</span><span class="p">))</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">'%Y-%m-</span><span class="si">%d</span><span class="s1">'</span><span class="p">)</span> |
| |
| <span class="c1"># For manually triggered dagruns that aren't run on a schedule, next/previous</span> |
| <span class="c1"># schedule dates don't make sense, and should be set to execution date for</span> |
| <span class="c1"># consistency with how execution_date is set for manually triggered tasks, i.e.</span> |
| <span class="c1"># triggered_date == execution_date.</span> |
| <span class="k">if</span> <span class="n">dag_run</span> <span class="ow">and</span> <span class="n">dag_run</span><span class="o">.</span><span class="n">external_trigger</span><span class="p">:</span> |
| <span class="n">prev_execution_date</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span> |
| <span class="n">next_execution_date</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">prev_execution_date</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">previous_schedule</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span> |
| <span class="n">next_execution_date</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">following_schedule</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span> |
| |
| <span class="n">next_ds</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="n">next_ds_nodash</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="n">next_execution_date</span><span class="p">:</span> |
| <span class="n">next_ds</span> <span class="o">=</span> <span class="n">next_execution_date</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">'%Y-%m-</span><span class="si">%d</span><span class="s1">'</span><span class="p">)</span> |
| <span class="n">next_ds_nodash</span> <span class="o">=</span> <span class="n">next_ds</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'-'</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> |
| <span class="n">next_execution_date</span> <span class="o">=</span> <span class="n">pendulum</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="n">next_execution_date</span><span class="p">)</span> |
| |
| <span class="n">prev_ds</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="n">prev_ds_nodash</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">if</span> <span class="n">prev_execution_date</span><span class="p">:</span> |
| <span class="n">prev_ds</span> <span class="o">=</span> <span class="n">prev_execution_date</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">'%Y-%m-</span><span class="si">%d</span><span class="s1">'</span><span class="p">)</span> |
| <span class="n">prev_ds_nodash</span> <span class="o">=</span> <span class="n">prev_ds</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'-'</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> |
| <span class="n">prev_execution_date</span> <span class="o">=</span> <span class="n">pendulum</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="n">prev_execution_date</span><span class="p">)</span> |
| |
| <span class="n">ds_nodash</span> <span class="o">=</span> <span class="n">ds</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'-'</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> |
| <span class="n">ts_nodash</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="o">.</span><span class="n">strftime</span><span class="p">(</span><span class="s1">'%Y%m</span><span class="si">%d</span><span class="s1">T%H%M%S'</span><span class="p">)</span> |
| <span class="n">ts_nodash_with_tz</span> <span class="o">=</span> <span class="n">ts</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'-'</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">':'</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> |
| <span class="n">yesterday_ds_nodash</span> <span class="o">=</span> <span class="n">yesterday_ds</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'-'</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> |
| <span class="n">tomorrow_ds_nodash</span> <span class="o">=</span> <span class="n">tomorrow_ds</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'-'</span><span class="p">,</span> <span class="s1">''</span><span class="p">)</span> |
| |
| <span class="n">ti_key_str</span> <span class="o">=</span> <span class="s2">"</span><span class="si">{dag_id}</span><span class="s2">__</span><span class="si">{task_id}</span><span class="s2">__</span><span class="si">{ds_nodash}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">dag_id</span><span class="o">=</span><span class="n">task</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="o">=</span><span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">ds_nodash</span><span class="o">=</span><span class="n">ds_nodash</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">task</span><span class="o">.</span><span class="n">params</span><span class="p">:</span> |
| <span class="n">params</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">params</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">conf</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="s1">'core'</span><span class="p">,</span> <span class="s1">'dag_run_conf_overrides_params'</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">overwrite_params_with_dag_run_conf</span><span class="p">(</span><span class="n">params</span><span class="o">=</span><span class="n">params</span><span class="p">,</span> <span class="n">dag_run</span><span class="o">=</span><span class="n">dag_run</span><span class="p">)</span> |
| |
| <span class="k">class</span> <span class="nc">VariableAccessor</span><span class="p">:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Wrapper around Variable. This way you can get variables in templates by using</span> |
| <span class="sd"> {var.value.your_variable_name}.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">def</span> <span class="nf">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">item</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">item</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">var</span> |
| |
| <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">var</span><span class="p">)</span> |
| |
| <span class="k">class</span> <span class="nc">VariableJsonAccessor</span><span class="p">:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Wrapper around deserialized Variables. This way you can get variables</span> |
| <span class="sd"> in templates by using {var.json.your_variable_name}.</span> |
| <span class="sd"> """</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="k">def</span> <span class="nf">__getattr__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">item</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">var</span> <span class="o">=</span> <span class="n">Variable</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">item</span><span class="p">,</span> <span class="n">deserialize_json</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">var</span> |
| |
| <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="nb">str</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">var</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="s1">'conf'</span><span class="p">:</span> <span class="n">conf</span><span class="p">,</span> |
| <span class="s1">'dag'</span><span class="p">:</span> <span class="n">task</span><span class="o">.</span><span class="n">dag</span><span class="p">,</span> |
| <span class="s1">'ds'</span><span class="p">:</span> <span class="n">ds</span><span class="p">,</span> |
| <span class="s1">'next_ds'</span><span class="p">:</span> <span class="n">next_ds</span><span class="p">,</span> |
| <span class="s1">'next_ds_nodash'</span><span class="p">:</span> <span class="n">next_ds_nodash</span><span class="p">,</span> |
| <span class="s1">'prev_ds'</span><span class="p">:</span> <span class="n">prev_ds</span><span class="p">,</span> |
| <span class="s1">'prev_ds_nodash'</span><span class="p">:</span> <span class="n">prev_ds_nodash</span><span class="p">,</span> |
| <span class="s1">'ds_nodash'</span><span class="p">:</span> <span class="n">ds_nodash</span><span class="p">,</span> |
| <span class="s1">'ts'</span><span class="p">:</span> <span class="n">ts</span><span class="p">,</span> |
| <span class="s1">'ts_nodash'</span><span class="p">:</span> <span class="n">ts_nodash</span><span class="p">,</span> |
| <span class="s1">'ts_nodash_with_tz'</span><span class="p">:</span> <span class="n">ts_nodash_with_tz</span><span class="p">,</span> |
| <span class="s1">'yesterday_ds'</span><span class="p">:</span> <span class="n">yesterday_ds</span><span class="p">,</span> |
| <span class="s1">'yesterday_ds_nodash'</span><span class="p">:</span> <span class="n">yesterday_ds_nodash</span><span class="p">,</span> |
| <span class="s1">'tomorrow_ds'</span><span class="p">:</span> <span class="n">tomorrow_ds</span><span class="p">,</span> |
| <span class="s1">'tomorrow_ds_nodash'</span><span class="p">:</span> <span class="n">tomorrow_ds_nodash</span><span class="p">,</span> |
| <span class="s1">'END_DATE'</span><span class="p">:</span> <span class="n">ds</span><span class="p">,</span> |
| <span class="s1">'end_date'</span><span class="p">:</span> <span class="n">ds</span><span class="p">,</span> |
| <span class="s1">'dag_run'</span><span class="p">:</span> <span class="n">dag_run</span><span class="p">,</span> |
| <span class="s1">'run_id'</span><span class="p">:</span> <span class="n">run_id</span><span class="p">,</span> |
| <span class="s1">'execution_date'</span><span class="p">:</span> <span class="n">pendulum</span><span class="o">.</span><span class="n">instance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">),</span> |
| <span class="s1">'prev_execution_date'</span><span class="p">:</span> <span class="n">prev_execution_date</span><span class="p">,</span> |
| <span class="s1">'prev_execution_date_success'</span><span class="p">:</span> <span class="n">lazy_object_proxy</span><span class="o">.</span><span class="n">Proxy</span><span class="p">(</span> |
| <span class="k">lambda</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">previous_execution_date_success</span><span class="p">),</span> |
| <span class="s1">'prev_start_date_success'</span><span class="p">:</span> <span class="n">lazy_object_proxy</span><span class="o">.</span><span class="n">Proxy</span><span class="p">(</span><span class="k">lambda</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">previous_start_date_success</span><span class="p">),</span> |
| <span class="s1">'next_execution_date'</span><span class="p">:</span> <span class="n">next_execution_date</span><span class="p">,</span> |
| <span class="s1">'latest_date'</span><span class="p">:</span> <span class="n">ds</span><span class="p">,</span> |
| <span class="s1">'macros'</span><span class="p">:</span> <span class="n">macros</span><span class="p">,</span> |
| <span class="s1">'params'</span><span class="p">:</span> <span class="n">params</span><span class="p">,</span> |
| <span class="s1">'tables'</span><span class="p">:</span> <span class="n">tables</span><span class="p">,</span> |
| <span class="s1">'task'</span><span class="p">:</span> <span class="n">task</span><span class="p">,</span> |
| <span class="s1">'task_instance'</span><span class="p">:</span> <span class="bp">self</span><span class="p">,</span> |
| <span class="s1">'ti'</span><span class="p">:</span> <span class="bp">self</span><span class="p">,</span> |
| <span class="s1">'task_instance_key_str'</span><span class="p">:</span> <span class="n">ti_key_str</span><span class="p">,</span> |
| <span class="s1">'test_mode'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">test_mode</span><span class="p">,</span> |
| <span class="s1">'var'</span><span class="p">:</span> <span class="p">{</span> |
| <span class="s1">'value'</span><span class="p">:</span> <span class="n">VariableAccessor</span><span class="p">(),</span> |
| <span class="s1">'json'</span><span class="p">:</span> <span class="n">VariableJsonAccessor</span><span class="p">()</span> |
| <span class="p">},</span> |
| <span class="s1">'inlets'</span><span class="p">:</span> <span class="n">task</span><span class="o">.</span><span class="n">inlets</span><span class="p">,</span> |
| <span class="s1">'outlets'</span><span class="p">:</span> <span class="n">task</span><span class="o">.</span><span class="n">outlets</span><span class="p">,</span></div> |
| <span class="p">}</span> |
| |
| <div class="viewcode-block" id="TaskInstance.overwrite_params_with_dag_run_conf"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.overwrite_params_with_dag_run_conf">[docs]</a> <span class="k">def</span> <span class="nf">overwrite_params_with_dag_run_conf</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">params</span><span class="p">,</span> <span class="n">dag_run</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">dag_run</span> <span class="ow">and</span> <span class="n">dag_run</span><span class="o">.</span><span class="n">conf</span><span class="p">:</span> |
| <span class="n">params</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">dag_run</span><span class="o">.</span><span class="n">conf</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.render_templates"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.render_templates">[docs]</a> <span class="k">def</span> <span class="nf">render_templates</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""Render templates in the operator fields."""</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">context</span><span class="p">:</span> |
| <span class="n">context</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_template_context</span><span class="p">()</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">render_template_fields</span><span class="p">(</span><span class="n">context</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.email_alert"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.email_alert">[docs]</a> <span class="k">def</span> <span class="nf">email_alert</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">exception</span><span class="p">):</span> |
| <span class="n">exception_html</span> <span class="o">=</span> <span class="nb">str</span><span class="p">(</span><span class="n">exception</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'</span><span class="se">\n</span><span class="s1">'</span><span class="p">,</span> <span class="s1">'<br>'</span><span class="p">)</span> |
| <span class="n">jinja_context</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_template_context</span><span class="p">()</span> |
| <span class="c1"># This function is called after changing the state</span> |
| <span class="c1"># from State.RUNNING so need to subtract 1 from self.try_number.</span> |
| <span class="n">jinja_context</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="nb">dict</span><span class="p">(</span> |
| <span class="n">exception</span><span class="o">=</span><span class="n">exception</span><span class="p">,</span> |
| <span class="n">exception_html</span><span class="o">=</span><span class="n">exception_html</span><span class="p">,</span> |
| <span class="n">try_number</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">try_number</span> <span class="o">-</span> <span class="mi">1</span><span class="p">,</span> |
| <span class="n">max_tries</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">max_tries</span><span class="p">))</span> |
| |
| <span class="n">jinja_env</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">get_template_env</span><span class="p">()</span> |
| |
| <span class="n">default_subject</span> <span class="o">=</span> <span class="s1">'Airflow alert: {{ti}}'</span> |
| <span class="c1"># For reporting purposes, we report based on 1-indexed,</span> |
| <span class="c1"># not 0-indexed lists (i.e. Try 1 instead of</span> |
| <span class="c1"># Try 0 for the first attempt).</span> |
| <span class="n">default_html_content</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="s1">'Try {{try_number}} out of {{max_tries + 1}}<br>'</span> |
| <span class="s1">'Exception:<br>{{exception_html}}<br>'</span> |
| <span class="s1">'Log: <a href="{{ti.log_url}}">Link</a><br>'</span> |
| <span class="s1">'Host: {{ti.hostname}}<br>'</span> |
| <span class="s1">'Log file: {{ti.log_filepath}}<br>'</span> |
| <span class="s1">'Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>'</span> |
| <span class="p">)</span> |
| |
| <span class="k">def</span> <span class="nf">render</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">content</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">conf</span><span class="o">.</span><span class="n">has_option</span><span class="p">(</span><span class="s1">'email'</span><span class="p">,</span> <span class="n">key</span><span class="p">):</span> |
| <span class="n">path</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'email'</span><span class="p">,</span> <span class="n">key</span><span class="p">)</span> |
| <span class="k">with</span> <span class="nb">open</span><span class="p">(</span><span class="n">path</span><span class="p">)</span> <span class="k">as</span> <span class="n">f</span><span class="p">:</span> |
| <span class="n">content</span> <span class="o">=</span> <span class="n">f</span><span class="o">.</span><span class="n">read</span><span class="p">()</span> |
| |
| <span class="k">return</span> <span class="n">jinja_env</span><span class="o">.</span><span class="n">from_string</span><span class="p">(</span><span class="n">content</span><span class="p">)</span><span class="o">.</span><span class="n">render</span><span class="p">(</span><span class="o">**</span><span class="n">jinja_context</span><span class="p">)</span> |
| |
| <span class="n">subject</span> <span class="o">=</span> <span class="n">render</span><span class="p">(</span><span class="s1">'subject_template'</span><span class="p">,</span> <span class="n">default_subject</span><span class="p">)</span> |
| <span class="n">html_content</span> <span class="o">=</span> <span class="n">render</span><span class="p">(</span><span class="s1">'html_content_template'</span><span class="p">,</span> <span class="n">default_html_content</span><span class="p">)</span> |
| <span class="n">send_email</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">task</span><span class="o">.</span><span class="n">email</span><span class="p">,</span> <span class="n">subject</span><span class="p">,</span> <span class="n">html_content</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.set_duration"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.set_duration">[docs]</a> <span class="k">def</span> <span class="nf">set_duration</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">duration</span> <span class="o">=</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">-</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span><span class="p">)</span><span class="o">.</span><span class="n">total_seconds</span><span class="p">()</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">duration</span> <span class="o">=</span> <span class="kc">None</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.xcom_push"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.xcom_push">[docs]</a> <span class="k">def</span> <span class="nf">xcom_push</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">key</span><span class="p">,</span> |
| <span class="n">value</span><span class="p">,</span> |
| <span class="n">execution_date</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Make an XCom available for tasks to pull.</span> |
| |
| <span class="sd"> :param key: A key for the XCom</span> |
| <span class="sd"> :type key: str</span> |
| <span class="sd"> :param value: A value for the XCom. The value is pickled and stored</span> |
| <span class="sd"> in the database.</span> |
| <span class="sd"> :type value: any pickleable object</span> |
| <span class="sd"> :param execution_date: if provided, the XCom will not be visible until</span> |
| <span class="sd"> this date. This can be used, for example, to send a message to a</span> |
| <span class="sd"> task on a future date without it being immediately visible.</span> |
| <span class="sd"> :type execution_date: datetime</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">if</span> <span class="n">execution_date</span> <span class="ow">and</span> <span class="n">execution_date</span> <span class="o"><</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="ne">ValueError</span><span class="p">(</span> |
| <span class="s1">'execution_date can not be in the past (current '</span> |
| <span class="s1">'execution_date is </span><span class="si">{}</span><span class="s1">; received </span><span class="si">{}</span><span class="s1">)'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">))</span> |
| |
| <span class="n">XCom</span><span class="o">.</span><span class="n">set</span><span class="p">(</span> |
| <span class="n">key</span><span class="o">=</span><span class="n">key</span><span class="p">,</span> |
| <span class="n">value</span><span class="o">=</span><span class="n">value</span><span class="p">,</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> |
| <span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">execution_date</span><span class="o">=</span><span class="n">execution_date</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="TaskInstance.xcom_pull"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.xcom_pull">[docs]</a> <span class="k">def</span> <span class="nf">xcom_pull</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">task_ids</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">dag_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">key</span><span class="o">=</span><span class="n">XCOM_RETURN_KEY</span><span class="p">,</span> |
| <span class="n">include_prior_dates</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Pull XComs that optionally meet certain criteria.</span> |
| |
| <span class="sd"> The default value for `key` limits the search to XComs</span> |
| <span class="sd"> that were returned by other tasks (as opposed to those that were pushed</span> |
| <span class="sd"> manually). To remove this filter, pass key=None (or any desired value).</span> |
| |
| <span class="sd"> If a single task_id string is provided, the result is the value of the</span> |
| <span class="sd"> most recent matching XCom from that task_id. If multiple task_ids are</span> |
| <span class="sd"> provided, a tuple of matching values is returned. None is returned</span> |
| <span class="sd"> whenever no matches are found.</span> |
| |
| <span class="sd"> :param key: A key for the XCom. If provided, only XComs with matching</span> |
| <span class="sd"> keys will be returned. The default key is 'return_value', also</span> |
| <span class="sd"> available as a constant XCOM_RETURN_KEY. This key is automatically</span> |
| <span class="sd"> given to XComs returned by tasks (as opposed to being pushed</span> |
| <span class="sd"> manually). To remove the filter, pass key=None.</span> |
| <span class="sd"> :type key: str</span> |
| <span class="sd"> :param task_ids: Only XComs from tasks with matching ids will be</span> |
| <span class="sd"> pulled. Can pass None to remove the filter.</span> |
| <span class="sd"> :type task_ids: str or iterable of strings (representing task_ids)</span> |
| <span class="sd"> :param dag_id: If provided, only pulls XComs from this DAG.</span> |
| <span class="sd"> If None (default), the DAG of the calling task is used.</span> |
| <span class="sd"> :type dag_id: str</span> |
| <span class="sd"> :param include_prior_dates: If False, only XComs from the current</span> |
| <span class="sd"> execution_date are returned. If True, XComs from previous dates</span> |
| <span class="sd"> are returned as well.</span> |
| <span class="sd"> :type include_prior_dates: bool</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">if</span> <span class="n">dag_id</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="n">dag_id</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span> |
| |
| <span class="n">pull_fn</span> <span class="o">=</span> <span class="n">functools</span><span class="o">.</span><span class="n">partial</span><span class="p">(</span> |
| <span class="n">XCom</span><span class="o">.</span><span class="n">get_one</span><span class="p">,</span> |
| <span class="n">execution_date</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> |
| <span class="n">key</span><span class="o">=</span><span class="n">key</span><span class="p">,</span> |
| <span class="n">dag_id</span><span class="o">=</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">include_prior_dates</span><span class="o">=</span><span class="n">include_prior_dates</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">is_container</span><span class="p">(</span><span class="n">task_ids</span><span class="p">):</span> |
| <span class="k">return</span> <span class="nb">tuple</span><span class="p">(</span><span class="n">pull_fn</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="n">t</span><span class="p">)</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">task_ids</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">pull_fn</span><span class="p">(</span><span class="n">task_id</span><span class="o">=</span><span class="n">task_ids</span><span class="p">)</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="TaskInstance.get_num_running_task_instances"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.get_num_running_task_instances">[docs]</a> <span class="k">def</span> <span class="nf">get_num_running_task_instances</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="p">):</span> |
| <span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span> |
| <span class="k">return</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span> |
| <span class="n">TI</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">TI</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> |
| <span class="n">TI</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span></div> |
| <span class="p">)</span><span class="o">.</span><span class="n">count</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="TaskInstance.init_run_context"><a class="viewcode-back" href="../../../_api/airflow/models/taskinstance/index.html#airflow.models.TaskInstance.init_run_context">[docs]</a> <span class="k">def</span> <span class="nf">init_run_context</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">raw</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Sets the log context.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">raw</span> <span class="o">=</span> <span class="n">raw</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_set_context</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span></div></div> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| |
| |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| <div class="footer">This page uses <a href="https://analytics.google.com/"> |
| Google Analytics</a> to collect statistics. You can disable it by blocking |
| the JavaScript coming from www.google-analytics.com. Check our |
| <a href="../../../privacy_notice.html">Privacy Policy</a> |
| for more details. |
| <script type="text/javascript"> |
| (function() { |
| var ga = document.createElement('script'); |
| ga.src = ('https:' == document.location.protocol ? |
| 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js'; |
| ga.setAttribute('async', 'true'); |
| var nodes = document.documentElement.childNodes; |
| var i = -1; |
| var node; |
| do { |
| i++; |
| node = nodes[i] |
| } while(node.nodeType !== Node.ELEMENT_NODE); |
| node.appendChild(ga); |
| })(); |
| </script> |
| </div> |
| |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |