blob: c93a34f71a56001b5907cdffa565416bc2ea1425 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>airflow.models.baseoperator &mdash; Airflow Documentation</title>
<script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script>
<script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../_static/language_data.js"></script>
<script type="text/javascript" src="../../../_static/js/theme.js"></script>
<link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../genindex.html" />
<link rel="search" title="Search" href="../../../search.html" />
<script>
document.addEventListener('DOMContentLoaded', function() {
var el = document.getElementById('changelog');
if (el !== null ) {
// [AIRFLOW-...]
el.innerHTML = el.innerHTML.replace(
/\[(AIRFLOW-[\d]+)\]/g,
`<a href="https://issues.apache.org/jira/browse/$1">[$1]</a>`
);
// (#...)
el.innerHTML = el.innerHTML.replace(
/\(#([\d]+)\)/g,
`<a href="https://github.com/apache/airflow/pull/$1">(#$1)</a>`
);
};
})
</script>
<script type="text/javascript">
var _gaq = _gaq || [];
_gaq.push(['_setAccount', 'UA-140539454-1']);
_gaq.push(['_trackPageview']);
</script>
<style>
.example-header {
position: relative;
background: #9AAA7A;
padding: 8px 16px;
margin-bottom: 0;
}
.example-header--with-button {
padding-right: 166px;
}
.example-header:after{
content: '';
display: table;
clear: both;
}
.example-title {
display:block;
padding: 4px;
margin-right: 16px;
color: white;
overflow-x: auto;
}
.example-header-button {
top: 8px;
right: 16px;
position: absolute;
}
.example-header + .highlight-python {
margin-top: 0 !important;
}
.viewcode-button {
display: inline-block;
padding: 8px 16px;
border: 0;
margin: 0;
outline: 0;
border-radius: 2px;
-webkit-box-shadow: 0 3px 5px 0 rgba(0,0,0,.3);
box-shadow: 0 3px 6px 0 rgba(0,0,0,.3);
color: #404040;
background-color: #e7e7e7;
cursor: pointer;
font-size: 16px;
font-weight: 500;
line-height: 1;
text-decoration: none;
text-overflow: ellipsis;
overflow: hidden;
text-transform: uppercase;
-webkit-transition: background-color .2s;
transition: background-color .2s;
vertical-align: middle;
white-space: nowrap;
}
.viewcode-button:visited {
color: #404040;
}
.viewcode-button:hover, .viewcode-button:focus {
color: #404040;
background-color: #d6d6d6;
}
</style>
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search" >
<a href="../../../index.html" class="icon icon-home"> Airflow
</a>
<div class="version">
1.10.5
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../project.html">Project</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../license.html">License</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../start.html">Quick Start</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../installation.html">Installation</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../tutorial.html">Tutorial</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../howto/index.html">How-to Guides</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../ui.html">UI / Screenshots</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../concepts.html">Concepts</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../profiling.html">Data Profiling</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../cli.html">Command Line Interface Reference</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../scheduler.html">Scheduling &amp; Triggers</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../plugins.html">Plugins</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../security.html">Security</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../timezone.html">Time zones</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../api.html">REST API Reference</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../integration.html">Integration</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../metrics.html">Metrics</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../kubernetes.html">Kubernetes</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../lineage.html">Lineage</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../changelog.html">Changelog</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../faq.html">FAQ</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../macros.html">Macros reference</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../_api/index.html">Python API Reference</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../privacy_notice.html">Privacy Notice</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../index.html">Airflow</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../index.html">Module code</a> &raquo;</li>
<li><a href="../models.html">airflow.models</a> &raquo;</li>
<li>airflow.models.baseoperator</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for airflow.models.baseoperator</h1><div class="highlight"><pre>
<span></span><span class="c1"># -*- coding: utf-8 -*-</span>
<span class="c1">#</span>
<span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span>
<span class="c1"># or more contributor license agreements. See the NOTICE file</span>
<span class="c1"># distributed with this work for additional information</span>
<span class="c1"># regarding copyright ownership. The ASF licenses this file</span>
<span class="c1"># to you under the Apache License, Version 2.0 (the</span>
<span class="c1"># &quot;License&quot;); you may not use this file except in compliance</span>
<span class="c1"># with the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing,</span>
<span class="c1"># software distributed under the License is distributed on an</span>
<span class="c1"># &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span>
<span class="c1"># KIND, either express or implied. See the License for the</span>
<span class="c1"># specific language governing permissions and limitations</span>
<span class="c1"># under the License.</span>
<span class="kn">from</span> <span class="nn">abc</span> <span class="k">import</span> <span class="n">ABCMeta</span><span class="p">,</span> <span class="n">abstractmethod</span>
<span class="kn">from</span> <span class="nn">cached_property</span> <span class="k">import</span> <span class="n">cached_property</span>
<span class="kn">import</span> <span class="nn">copy</span>
<span class="kn">import</span> <span class="nn">functools</span>
<span class="kn">import</span> <span class="nn">logging</span>
<span class="kn">import</span> <span class="nn">sys</span>
<span class="kn">import</span> <span class="nn">warnings</span>
<span class="kn">from</span> <span class="nn">datetime</span> <span class="k">import</span> <span class="n">timedelta</span><span class="p">,</span> <span class="n">datetime</span>
<span class="kn">from</span> <span class="nn">typing</span> <span class="k">import</span> <span class="n">Iterable</span><span class="p">,</span> <span class="n">Optional</span><span class="p">,</span> <span class="n">Dict</span><span class="p">,</span> <span class="n">Callable</span><span class="p">,</span> <span class="n">Set</span>
<span class="kn">import</span> <span class="nn">jinja2</span>
<span class="kn">import</span> <span class="nn">six</span>
<span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">configuration</span><span class="p">,</span> <span class="n">settings</span>
<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowException</span>
<span class="kn">from</span> <span class="nn">airflow.lineage</span> <span class="k">import</span> <span class="n">prepare_lineage</span><span class="p">,</span> <span class="n">apply_lineage</span><span class="p">,</span> <span class="n">DataSet</span>
<span class="kn">from</span> <span class="nn">airflow.models.dag</span> <span class="k">import</span> <span class="n">DAG</span>
<span class="kn">from</span> <span class="nn">airflow.models.pool</span> <span class="k">import</span> <span class="n">Pool</span>
<span class="kn">from</span> <span class="nn">airflow.models.taskinstance</span> <span class="k">import</span> <span class="n">TaskInstance</span><span class="p">,</span> <span class="n">clear_task_instances</span>
<span class="kn">from</span> <span class="nn">airflow.models.xcom</span> <span class="k">import</span> <span class="n">XCOM_RETURN_KEY</span>
<span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.not_in_retry_period_dep</span> <span class="k">import</span> <span class="n">NotInRetryPeriodDep</span>
<span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.prev_dagrun_dep</span> <span class="k">import</span> <span class="n">PrevDagrunDep</span>
<span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.trigger_rule_dep</span> <span class="k">import</span> <span class="n">TriggerRuleDep</span>
<span class="kn">from</span> <span class="nn">airflow.utils</span> <span class="k">import</span> <span class="n">timezone</span>
<span class="kn">from</span> <span class="nn">airflow.utils.db</span> <span class="k">import</span> <span class="n">provide_session</span>
<span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span>
<span class="kn">from</span> <span class="nn">airflow.utils.helpers</span> <span class="k">import</span> <span class="n">validate_key</span>
<span class="kn">from</span> <span class="nn">airflow.utils.log.logging_mixin</span> <span class="k">import</span> <span class="n">LoggingMixin</span>
<span class="kn">from</span> <span class="nn">airflow.utils.operator_resources</span> <span class="k">import</span> <span class="n">Resources</span>
<span class="kn">from</span> <span class="nn">airflow.utils.trigger_rule</span> <span class="k">import</span> <span class="n">TriggerRule</span>
<span class="kn">from</span> <span class="nn">airflow.utils.weight_rule</span> <span class="k">import</span> <span class="n">WeightRule</span>
<div class="viewcode-block" id="BaseOperator"><a class="viewcode-back" href="../../../_api/airflow/models/index.html#airflow.models.baseoperator.BaseOperator">[docs]</a><span class="nd">@functools</span><span class="o">.</span><span class="n">total_ordering</span>
<span class="k">class</span> <span class="nc">BaseOperator</span><span class="p">(</span><span class="n">LoggingMixin</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Abstract base class for all operators. Since operators create objects that</span>
<span class="sd"> become nodes in the dag, BaseOperator contains many recursive methods for</span>
<span class="sd"> dag crawling behavior. To derive this class, you are expected to override</span>
<span class="sd"> the constructor as well as the &#39;execute&#39; method.</span>
<span class="sd"> Operators derived from this class should perform or trigger certain tasks</span>
<span class="sd"> synchronously (wait for completion). Example of operators could be an</span>
<span class="sd"> operator that runs a Pig job (PigOperator), a sensor operator that</span>
<span class="sd"> waits for a partition to land in Hive (HiveSensorOperator), or one that</span>
<span class="sd"> moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these</span>
<span class="sd"> operators (tasks) target specific operations, running specific scripts,</span>
<span class="sd"> functions or data transfers.</span>
<span class="sd"> This class is abstract and shouldn&#39;t be instantiated. Instantiating a</span>
<span class="sd"> class derived from this one results in the creation of a task object,</span>
<span class="sd"> which ultimately becomes a node in DAG objects. Task dependencies should</span>
<span class="sd"> be set by using the set_upstream and/or set_downstream methods.</span>
<span class="sd"> :param task_id: a unique, meaningful id for the task</span>
<span class="sd"> :type task_id: str</span>
<span class="sd"> :param owner: the owner of the task, using the unix username is recommended</span>
<span class="sd"> :type owner: str</span>
<span class="sd"> :param retries: the number of retries that should be performed before</span>
<span class="sd"> failing the task</span>
<span class="sd"> :type retries: int</span>
<span class="sd"> :param retry_delay: delay between retries</span>
<span class="sd"> :type retry_delay: datetime.timedelta</span>
<span class="sd"> :param retry_exponential_backoff: allow progressive longer waits between</span>
<span class="sd"> retries by using exponential backoff algorithm on retry delay (delay</span>
<span class="sd"> will be converted into seconds)</span>
<span class="sd"> :type retry_exponential_backoff: bool</span>
<span class="sd"> :param max_retry_delay: maximum delay interval between retries</span>
<span class="sd"> :type max_retry_delay: datetime.timedelta</span>
<span class="sd"> :param start_date: The ``start_date`` for the task, determines</span>
<span class="sd"> the ``execution_date`` for the first task instance. The best practice</span>
<span class="sd"> is to have the start_date rounded</span>
<span class="sd"> to your DAG&#39;s ``schedule_interval``. Daily jobs have their start_date</span>
<span class="sd"> some day at 00:00:00, hourly jobs have their start_date at 00:00</span>
<span class="sd"> of a specific hour. Note that Airflow simply looks at the latest</span>
<span class="sd"> ``execution_date`` and adds the ``schedule_interval`` to determine</span>
<span class="sd"> the next ``execution_date``. It is also very important</span>
<span class="sd"> to note that different tasks&#39; dependencies</span>
<span class="sd"> need to line up in time. If task A depends on task B and their</span>
<span class="sd"> start_date are offset in a way that their execution_date don&#39;t line</span>
<span class="sd"> up, A&#39;s dependencies will never be met. If you are looking to delay</span>
<span class="sd"> a task, for example running a daily task at 2AM, look into the</span>
<span class="sd"> ``TimeSensor`` and ``TimeDeltaSensor``. We advise against using</span>
<span class="sd"> dynamic ``start_date`` and recommend using fixed ones. Read the</span>
<span class="sd"> FAQ entry about start_date for more information.</span>
<span class="sd"> :type start_date: datetime.datetime</span>
<span class="sd"> :param end_date: if specified, the scheduler won&#39;t go beyond this date</span>
<span class="sd"> :type end_date: datetime.datetime</span>
<span class="sd"> :param depends_on_past: when set to true, task instances will run</span>
<span class="sd"> sequentially while relying on the previous task&#39;s schedule to</span>
<span class="sd"> succeed. The task instance for the start_date is allowed to run.</span>
<span class="sd"> :type depends_on_past: bool</span>
<span class="sd"> :param wait_for_downstream: when set to true, an instance of task</span>
<span class="sd"> X will wait for tasks immediately downstream of the previous instance</span>
<span class="sd"> of task X to finish successfully before it runs. This is useful if the</span>
<span class="sd"> different instances of a task X alter the same asset, and this asset</span>
<span class="sd"> is used by tasks downstream of task X. Note that depends_on_past</span>
<span class="sd"> is forced to True wherever wait_for_downstream is used.</span>
<span class="sd"> :type wait_for_downstream: bool</span>
<span class="sd"> :param queue: which queue to target when running this job. Not</span>
<span class="sd"> all executors implement queue management, the CeleryExecutor</span>
<span class="sd"> does support targeting specific queues.</span>
<span class="sd"> :type queue: str</span>
<span class="sd"> :param dag: a reference to the dag the task is attached to (if any)</span>
<span class="sd"> :type dag: airflow.models.DAG</span>
<span class="sd"> :param priority_weight: priority weight of this task against other task.</span>
<span class="sd"> This allows the executor to trigger higher priority tasks before</span>
<span class="sd"> others when things get backed up. Set priority_weight as a higher</span>
<span class="sd"> number for more important tasks.</span>
<span class="sd"> :type priority_weight: int</span>
<span class="sd"> :param weight_rule: weighting method used for the effective total</span>
<span class="sd"> priority weight of the task. Options are:</span>
<span class="sd"> ``{ downstream | upstream | absolute }`` default is ``downstream``</span>
<span class="sd"> When set to ``downstream`` the effective weight of the task is the</span>
<span class="sd"> aggregate sum of all downstream descendants. As a result, upstream</span>
<span class="sd"> tasks will have higher weight and will be scheduled more aggressively</span>
<span class="sd"> when using positive weight values. This is useful when you have</span>
<span class="sd"> multiple dag run instances and desire to have all upstream tasks to</span>
<span class="sd"> complete for all runs before each dag can continue processing</span>
<span class="sd"> downstream tasks. When set to ``upstream`` the effective weight is the</span>
<span class="sd"> aggregate sum of all upstream ancestors. This is the opposite where</span>
<span class="sd"> downtream tasks have higher weight and will be scheduled more</span>
<span class="sd"> aggressively when using positive weight values. This is useful when you</span>
<span class="sd"> have multiple dag run instances and prefer to have each dag complete</span>
<span class="sd"> before starting upstream tasks of other dags. When set to</span>
<span class="sd"> ``absolute``, the effective weight is the exact ``priority_weight``</span>
<span class="sd"> specified without additional weighting. You may want to do this when</span>
<span class="sd"> you know exactly what priority weight each task should have.</span>
<span class="sd"> Additionally, when set to ``absolute``, there is bonus effect of</span>
<span class="sd"> significantly speeding up the task creation process as for very large</span>
<span class="sd"> DAGS. Options can be set as string or using the constants defined in</span>
<span class="sd"> the static class ``airflow.utils.WeightRule``</span>
<span class="sd"> :type weight_rule: str</span>
<span class="sd"> :param pool: the slot pool this task should run in, slot pools are a</span>
<span class="sd"> way to limit concurrency for certain tasks</span>
<span class="sd"> :type pool: str</span>
<span class="sd"> :param sla: time by which the job is expected to succeed. Note that</span>
<span class="sd"> this represents the ``timedelta`` after the period is closed. For</span>
<span class="sd"> example if you set an SLA of 1 hour, the scheduler would send an email</span>
<span class="sd"> soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance</span>
<span class="sd"> has not succeeded yet.</span>
<span class="sd"> The scheduler pays special attention for jobs with an SLA and</span>
<span class="sd"> sends alert</span>
<span class="sd"> emails for sla misses. SLA misses are also recorded in the database</span>
<span class="sd"> for future reference. All tasks that share the same SLA time</span>
<span class="sd"> get bundled in a single email, sent soon after that time. SLA</span>
<span class="sd"> notification are sent once and only once for each task instance.</span>
<span class="sd"> :type sla: datetime.timedelta</span>
<span class="sd"> :param execution_timeout: max time allowed for the execution of</span>
<span class="sd"> this task instance, if it goes beyond it will raise and fail.</span>
<span class="sd"> :type execution_timeout: datetime.timedelta</span>
<span class="sd"> :param on_failure_callback: a function to be called when a task instance</span>
<span class="sd"> of this task fails. a context dictionary is passed as a single</span>
<span class="sd"> parameter to this function. Context contains references to related</span>
<span class="sd"> objects to the task instance and is documented under the macros</span>
<span class="sd"> section of the API.</span>
<span class="sd"> :type on_failure_callback: callable</span>
<span class="sd"> :param on_retry_callback: much like the ``on_failure_callback`` except</span>
<span class="sd"> that it is executed when retries occur.</span>
<span class="sd"> :type on_retry_callback: callable</span>
<span class="sd"> :param on_success_callback: much like the ``on_failure_callback`` except</span>
<span class="sd"> that it is executed when the task succeeds.</span>
<span class="sd"> :type on_success_callback: callable</span>
<span class="sd"> :param trigger_rule: defines the rule by which dependencies are applied</span>
<span class="sd"> for the task to get triggered. Options are:</span>
<span class="sd"> ``{ all_success | all_failed | all_done | one_success |</span>
<span class="sd"> one_failed | none_failed | none_skipped | dummy}``</span>
<span class="sd"> default is ``all_success``. Options can be set as string or</span>
<span class="sd"> using the constants defined in the static class</span>
<span class="sd"> ``airflow.utils.TriggerRule``</span>
<span class="sd"> :type trigger_rule: str</span>
<span class="sd"> :param resources: A map of resource parameter names (the argument names of the</span>
<span class="sd"> Resources constructor) to their values.</span>
<span class="sd"> :type resources: dict</span>
<span class="sd"> :param run_as_user: unix username to impersonate while running the task</span>
<span class="sd"> :type run_as_user: str</span>
<span class="sd"> :param task_concurrency: When set, a task will be able to limit the concurrent</span>
<span class="sd"> runs across execution_dates</span>
<span class="sd"> :type task_concurrency: int</span>
<span class="sd"> :param executor_config: Additional task-level configuration parameters that are</span>
<span class="sd"> interpreted by a specific executor. Parameters are namespaced by the name of</span>
<span class="sd"> executor.</span>
<span class="sd"> **Example**: to run this task in a specific docker container through</span>
<span class="sd"> the KubernetesExecutor ::</span>
<span class="sd"> MyOperator(...,</span>
<span class="sd"> executor_config={</span>
<span class="sd"> &quot;KubernetesExecutor&quot;:</span>
<span class="sd"> {&quot;image&quot;: &quot;myCustomDockerImage&quot;}</span>
<span class="sd"> }</span>
<span class="sd"> )</span>
<span class="sd"> :type executor_config: dict</span>
<span class="sd"> :param do_xcom_push: if True, an XCom is pushed containing the Operator&#39;s</span>
<span class="sd"> result</span>
<span class="sd"> :type do_xcom_push: bool</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="c1"># For derived classes to define which fields will get jinjaified</span>
<div class="viewcode-block" id="BaseOperator.template_fields"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.template_fields">[docs]</a> <span class="n">template_fields</span> <span class="o">=</span> <span class="p">[]</span> <span class="c1"># type: Iterable[str]</span></div>
<span class="c1"># Defines which files extensions to look for in the templated fields</span>
<div class="viewcode-block" id="BaseOperator.template_ext"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.template_ext">[docs]</a> <span class="n">template_ext</span> <span class="o">=</span> <span class="p">[]</span> <span class="c1"># type: Iterable[str]</span></div>
<span class="c1"># Defines the color in the UI</span>
<div class="viewcode-block" id="BaseOperator.ui_color"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.ui_color">[docs]</a> <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">&#39;#fff&#39;</span></div>
<div class="viewcode-block" id="BaseOperator.ui_fgcolor"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.ui_fgcolor">[docs]</a> <span class="n">ui_fgcolor</span> <span class="o">=</span> <span class="s1">&#39;#000&#39;</span></div>
<span class="c1"># base list which includes all the attrs that don&#39;t need deep copy.</span>
<div class="viewcode-block" id="BaseOperator._base_operator_shallow_copy_attrs"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator._base_operator_shallow_copy_attrs">[docs]</a> <span class="n">_base_operator_shallow_copy_attrs</span> <span class="o">=</span> <span class="p">(</span><span class="s1">&#39;user_defined_macros&#39;</span><span class="p">,</span>
<span class="s1">&#39;user_defined_filters&#39;</span><span class="p">,</span>
<span class="s1">&#39;params&#39;</span><span class="p">,</span>
<span class="s1">&#39;_log&#39;</span><span class="p">,)</span></div>
<span class="c1"># each operator should override this class attr for shallow copy attrs.</span>
<div class="viewcode-block" id="BaseOperator.shallow_copy_attrs"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.shallow_copy_attrs">[docs]</a> <span class="n">shallow_copy_attrs</span> <span class="o">=</span> <span class="p">()</span> <span class="c1"># type: Iterable[str]</span></div>
<span class="c1"># Defines the operator level extra links</span>
<div class="viewcode-block" id="BaseOperator.operator_extra_links"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.operator_extra_links">[docs]</a> <span class="n">operator_extra_links</span> <span class="o">=</span> <span class="p">()</span> <span class="c1"># type: Iterable[BaseOperatorLink]</span></div>
<div class="viewcode-block" id="BaseOperator._comps"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator._comps">[docs]</a> <span class="n">_comps</span> <span class="o">=</span> <span class="p">{</span>
<span class="s1">&#39;task_id&#39;</span><span class="p">,</span>
<span class="s1">&#39;dag_id&#39;</span><span class="p">,</span>
<span class="s1">&#39;owner&#39;</span><span class="p">,</span>
<span class="s1">&#39;email&#39;</span><span class="p">,</span>
<span class="s1">&#39;email_on_retry&#39;</span><span class="p">,</span>
<span class="s1">&#39;retry_delay&#39;</span><span class="p">,</span>
<span class="s1">&#39;retry_exponential_backoff&#39;</span><span class="p">,</span>
<span class="s1">&#39;max_retry_delay&#39;</span><span class="p">,</span>
<span class="s1">&#39;start_date&#39;</span><span class="p">,</span>
<span class="s1">&#39;schedule_interval&#39;</span><span class="p">,</span>
<span class="s1">&#39;depends_on_past&#39;</span><span class="p">,</span>
<span class="s1">&#39;wait_for_downstream&#39;</span><span class="p">,</span>
<span class="s1">&#39;priority_weight&#39;</span><span class="p">,</span>
<span class="s1">&#39;sla&#39;</span><span class="p">,</span>
<span class="s1">&#39;execution_timeout&#39;</span><span class="p">,</span>
<span class="s1">&#39;on_failure_callback&#39;</span><span class="p">,</span>
<span class="s1">&#39;on_success_callback&#39;</span><span class="p">,</span>
<span class="s1">&#39;on_retry_callback&#39;</span><span class="p">,</span>
<span class="s1">&#39;do_xcom_push&#39;</span><span class="p">,</span></div>
<span class="p">}</span>
<span class="nd">@apply_defaults</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">task_id</span><span class="p">,</span> <span class="c1"># type: str</span>
<span class="n">owner</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;operators&#39;</span><span class="p">,</span> <span class="s1">&#39;DEFAULT_OWNER&#39;</span><span class="p">),</span> <span class="c1"># type: str</span>
<span class="n">email</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[str]</span>
<span class="n">email_on_retry</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="c1"># type: bool</span>
<span class="n">email_on_failure</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="c1"># type: bool</span>
<span class="n">retries</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> <span class="c1"># type: int</span>
<span class="n">retry_delay</span><span class="o">=</span><span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="mi">300</span><span class="p">),</span> <span class="c1"># type: timedelta</span>
<span class="n">retry_exponential_backoff</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="c1"># type: bool</span>
<span class="n">max_retry_delay</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[datetime]</span>
<span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[datetime]</span>
<span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[datetime]</span>
<span class="n">schedule_interval</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># not hooked as of now</span>
<span class="n">depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="c1"># type: bool</span>
<span class="n">wait_for_downstream</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="c1"># type: bool</span>
<span class="n">dag</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[DAG]</span>
<span class="n">params</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span>
<span class="n">default_args</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span>
<span class="n">priority_weight</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span> <span class="c1"># type: int</span>
<span class="n">weight_rule</span><span class="o">=</span><span class="n">WeightRule</span><span class="o">.</span><span class="n">DOWNSTREAM</span><span class="p">,</span> <span class="c1"># type: str</span>
<span class="n">queue</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;celery&#39;</span><span class="p">,</span> <span class="s1">&#39;default_queue&#39;</span><span class="p">),</span> <span class="c1"># type: str</span>
<span class="n">pool</span><span class="o">=</span><span class="n">Pool</span><span class="o">.</span><span class="n">DEFAULT_POOL_NAME</span><span class="p">,</span> <span class="c1"># type: str</span>
<span class="n">sla</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[timedelta]</span>
<span class="n">execution_timeout</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[timedelta]</span>
<span class="n">on_failure_callback</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Callable]</span>
<span class="n">on_success_callback</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Callable]</span>
<span class="n">on_retry_callback</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Callable]</span>
<span class="n">trigger_rule</span><span class="o">=</span><span class="n">TriggerRule</span><span class="o">.</span><span class="n">ALL_SUCCESS</span><span class="p">,</span> <span class="c1"># type: str</span>
<span class="n">resources</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span>
<span class="n">run_as_user</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[str]</span>
<span class="n">task_concurrency</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[int]</span>
<span class="n">executor_config</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span>
<span class="n">do_xcom_push</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="c1"># type: bool</span>
<span class="n">inlets</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span>
<span class="n">outlets</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span>
<span class="o">*</span><span class="n">args</span><span class="p">,</span>
<span class="o">**</span><span class="n">kwargs</span>
<span class="p">):</span>
<span class="k">if</span> <span class="n">args</span> <span class="ow">or</span> <span class="n">kwargs</span><span class="p">:</span>
<span class="c1"># TODO remove *args and **kwargs in Airflow 2.0</span>
<span class="n">warnings</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span>
<span class="s1">&#39;Invalid arguments were passed to </span><span class="si">{c}</span><span class="s1"> (task_id: </span><span class="si">{t}</span><span class="s1">). &#39;</span>
<span class="s1">&#39;Support for passing such arguments will be dropped in &#39;</span>
<span class="s1">&#39;Airflow 2.0. Invalid arguments were:&#39;</span>
<span class="s1">&#39;</span><span class="se">\n</span><span class="s1">*args: </span><span class="si">{a}</span><span class="se">\n</span><span class="s1">**kwargs: </span><span class="si">{k}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="n">c</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">a</span><span class="o">=</span><span class="n">args</span><span class="p">,</span> <span class="n">k</span><span class="o">=</span><span class="n">kwargs</span><span class="p">,</span> <span class="n">t</span><span class="o">=</span><span class="n">task_id</span><span class="p">),</span>
<span class="n">category</span><span class="o">=</span><span class="ne">PendingDeprecationWarning</span><span class="p">,</span>
<span class="n">stacklevel</span><span class="o">=</span><span class="mi">3</span>
<span class="p">)</span>
<span class="n">validate_key</span><span class="p">(</span><span class="n">task_id</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_id</span> <span class="o">=</span> <span class="n">task_id</span>
<span class="bp">self</span><span class="o">.</span><span class="n">owner</span> <span class="o">=</span> <span class="n">owner</span>
<span class="bp">self</span><span class="o">.</span><span class="n">email</span> <span class="o">=</span> <span class="n">email</span>
<span class="bp">self</span><span class="o">.</span><span class="n">email_on_retry</span> <span class="o">=</span> <span class="n">email_on_retry</span>
<span class="bp">self</span><span class="o">.</span><span class="n">email_on_failure</span> <span class="o">=</span> <span class="n">email_on_failure</span>
<span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">start_date</span>
<span class="k">if</span> <span class="n">start_date</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">start_date</span><span class="p">,</span> <span class="n">datetime</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">&quot;start_date for </span><span class="si">%s</span><span class="s2"> isn&#39;t datetime.datetime&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">start_date</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">convert_to_utc</span><span class="p">(</span><span class="n">start_date</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">end_date</span>
<span class="k">if</span> <span class="n">end_date</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">convert_to_utc</span><span class="p">(</span><span class="n">end_date</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">TriggerRule</span><span class="o">.</span><span class="n">is_valid</span><span class="p">(</span><span class="n">trigger_rule</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
<span class="s2">&quot;The trigger_rule must be one of </span><span class="si">{all_triggers}</span><span class="s2">,&quot;</span>
<span class="s2">&quot;&#39;</span><span class="si">{d}</span><span class="s2">.</span><span class="si">{t}</span><span class="s2">&#39;; received &#39;</span><span class="si">{tr}</span><span class="s2">&#39;.&quot;</span>
<span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">all_triggers</span><span class="o">=</span><span class="n">TriggerRule</span><span class="o">.</span><span class="n">all_triggers</span><span class="p">(),</span>
<span class="n">d</span><span class="o">=</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">if</span> <span class="n">dag</span> <span class="k">else</span> <span class="s2">&quot;&quot;</span><span class="p">,</span> <span class="n">t</span><span class="o">=</span><span class="n">task_id</span><span class="p">,</span> <span class="n">tr</span><span class="o">=</span><span class="n">trigger_rule</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">trigger_rule</span> <span class="o">=</span> <span class="n">trigger_rule</span>
<span class="bp">self</span><span class="o">.</span><span class="n">depends_on_past</span> <span class="o">=</span> <span class="n">depends_on_past</span>
<span class="bp">self</span><span class="o">.</span><span class="n">wait_for_downstream</span> <span class="o">=</span> <span class="n">wait_for_downstream</span>
<span class="k">if</span> <span class="n">wait_for_downstream</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">depends_on_past</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">if</span> <span class="n">schedule_interval</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s2">&quot;schedule_interval is used for </span><span class="si">%s</span><span class="s2">, though it has &quot;</span>
<span class="s2">&quot;been deprecated as a task parameter, you need to &quot;</span>
<span class="s2">&quot;specify it as a DAG parameter instead&quot;</span><span class="p">,</span>
<span class="bp">self</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span> <span class="o">=</span> <span class="n">schedule_interval</span>
<span class="bp">self</span><span class="o">.</span><span class="n">retries</span> <span class="o">=</span> <span class="n">retries</span>
<span class="bp">self</span><span class="o">.</span><span class="n">queue</span> <span class="o">=</span> <span class="n">queue</span>
<span class="bp">self</span><span class="o">.</span><span class="n">pool</span> <span class="o">=</span> <span class="n">pool</span>
<span class="bp">self</span><span class="o">.</span><span class="n">sla</span> <span class="o">=</span> <span class="n">sla</span>
<span class="bp">self</span><span class="o">.</span><span class="n">execution_timeout</span> <span class="o">=</span> <span class="n">execution_timeout</span>
<span class="bp">self</span><span class="o">.</span><span class="n">on_failure_callback</span> <span class="o">=</span> <span class="n">on_failure_callback</span>
<span class="bp">self</span><span class="o">.</span><span class="n">on_success_callback</span> <span class="o">=</span> <span class="n">on_success_callback</span>
<span class="bp">self</span><span class="o">.</span><span class="n">on_retry_callback</span> <span class="o">=</span> <span class="n">on_retry_callback</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">retry_delay</span><span class="p">,</span> <span class="n">timedelta</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">retry_delay</span> <span class="o">=</span> <span class="n">retry_delay</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;Retry_delay isn&#39;t timedelta object, assuming secs&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">retry_delay</span> <span class="o">=</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">retry_delay</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">retry_exponential_backoff</span> <span class="o">=</span> <span class="n">retry_exponential_backoff</span>
<span class="bp">self</span><span class="o">.</span><span class="n">max_retry_delay</span> <span class="o">=</span> <span class="n">max_retry_delay</span>
<span class="bp">self</span><span class="o">.</span><span class="n">params</span> <span class="o">=</span> <span class="n">params</span> <span class="ow">or</span> <span class="p">{}</span> <span class="c1"># Available in templates!</span>
<span class="bp">self</span><span class="o">.</span><span class="n">priority_weight</span> <span class="o">=</span> <span class="n">priority_weight</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">WeightRule</span><span class="o">.</span><span class="n">is_valid</span><span class="p">(</span><span class="n">weight_rule</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
<span class="s2">&quot;The weight_rule must be one of </span><span class="si">{all_weight_rules}</span><span class="s2">,&quot;</span>
<span class="s2">&quot;&#39;</span><span class="si">{d}</span><span class="s2">.</span><span class="si">{t}</span><span class="s2">&#39;; received &#39;</span><span class="si">{tr}</span><span class="s2">&#39;.&quot;</span>
<span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">all_weight_rules</span><span class="o">=</span><span class="n">WeightRule</span><span class="o">.</span><span class="n">all_weight_rules</span><span class="p">,</span>
<span class="n">d</span><span class="o">=</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">if</span> <span class="n">dag</span> <span class="k">else</span> <span class="s2">&quot;&quot;</span><span class="p">,</span> <span class="n">t</span><span class="o">=</span><span class="n">task_id</span><span class="p">,</span> <span class="n">tr</span><span class="o">=</span><span class="n">weight_rule</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">weight_rule</span> <span class="o">=</span> <span class="n">weight_rule</span>
<span class="bp">self</span><span class="o">.</span><span class="n">resources</span> <span class="o">=</span> <span class="n">Resources</span><span class="p">(</span><span class="o">*</span><span class="n">resources</span><span class="p">)</span> <span class="k">if</span> <span class="n">resources</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="k">else</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">run_as_user</span> <span class="o">=</span> <span class="n">run_as_user</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_concurrency</span> <span class="o">=</span> <span class="n">task_concurrency</span>
<span class="bp">self</span><span class="o">.</span><span class="n">executor_config</span> <span class="o">=</span> <span class="n">executor_config</span> <span class="ow">or</span> <span class="p">{}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">do_xcom_push</span> <span class="o">=</span> <span class="n">do_xcom_push</span>
<span class="c1"># Private attributes</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_upstream_task_ids</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span> <span class="c1"># type: Set[str]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_downstream_task_ids</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span> <span class="c1"># type: Set[str]</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">dag</span> <span class="ow">and</span> <span class="n">settings</span><span class="o">.</span><span class="n">CONTEXT_MANAGER_DAG</span><span class="p">:</span>
<span class="n">dag</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">CONTEXT_MANAGER_DAG</span>
<span class="k">if</span> <span class="n">dag</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">dag</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_log</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s2">&quot;airflow.task.operators&quot;</span><span class="p">)</span>
<span class="c1"># lineage</span>
<span class="bp">self</span><span class="o">.</span><span class="n">inlets</span> <span class="o">=</span> <span class="p">[]</span> <span class="c1"># type: Iterable[DataSet]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">outlets</span> <span class="o">=</span> <span class="p">[]</span> <span class="c1"># type: Iterable[DataSet]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">lineage_data</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_inlets</span> <span class="o">=</span> <span class="p">{</span>
<span class="s2">&quot;auto&quot;</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span>
<span class="s2">&quot;task_ids&quot;</span><span class="p">:</span> <span class="p">[],</span>
<span class="s2">&quot;datasets&quot;</span><span class="p">:</span> <span class="p">[],</span>
<span class="p">}</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_outlets</span> <span class="o">=</span> <span class="p">{</span>
<span class="s2">&quot;datasets&quot;</span><span class="p">:</span> <span class="p">[],</span>
<span class="p">}</span> <span class="c1"># type: Dict</span>
<span class="k">if</span> <span class="n">inlets</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_inlets</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">inlets</span><span class="p">)</span>
<span class="k">if</span> <span class="n">outlets</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_outlets</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">outlets</span><span class="p">)</span>
<div class="viewcode-block" id="BaseOperator.__eq__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__eq__">[docs]</a> <span class="k">def</span> <span class="nf">__eq__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="k">if</span> <span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">==</span> <span class="nb">type</span><span class="p">(</span><span class="n">other</span><span class="p">)</span> <span class="ow">and</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">task_id</span><span class="p">):</span>
<span class="k">return</span> <span class="nb">all</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> <span class="k">for</span> <span class="n">c</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_comps</span><span class="p">)</span>
<span class="k">return</span> <span class="kc">False</span></div>
<div class="viewcode-block" id="BaseOperator.__ne__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__ne__">[docs]</a> <span class="k">def</span> <span class="nf">__ne__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="k">return</span> <span class="ow">not</span> <span class="bp">self</span> <span class="o">==</span> <span class="n">other</span></div>
<div class="viewcode-block" id="BaseOperator.__lt__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__lt__">[docs]</a> <span class="k">def</span> <span class="nf">__lt__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span> <span class="o">&lt;</span> <span class="n">other</span><span class="o">.</span><span class="n">task_id</span></div>
<div class="viewcode-block" id="BaseOperator.__hash__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__hash__">[docs]</a> <span class="k">def</span> <span class="nf">__hash__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">hash_components</span> <span class="o">=</span> <span class="p">[</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)]</span>
<span class="k">for</span> <span class="n">c</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_comps</span><span class="p">:</span>
<span class="n">val</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">c</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="nb">hash</span><span class="p">(</span><span class="n">val</span><span class="p">)</span>
<span class="n">hash_components</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">val</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="n">hash_components</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="nb">repr</span><span class="p">(</span><span class="n">val</span><span class="p">))</span>
<span class="k">return</span> <span class="nb">hash</span><span class="p">(</span><span class="nb">tuple</span><span class="p">(</span><span class="n">hash_components</span><span class="p">))</span></div>
<span class="c1"># Composing Operators -----------------------------------------------</span>
<div class="viewcode-block" id="BaseOperator.__rshift__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__rshift__">[docs]</a> <span class="k">def</span> <span class="nf">__rshift__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Implements Self &gt;&gt; Other == self.set_downstream(other)</span>
<span class="sd"> If &quot;Other&quot; is a DAG, the DAG is assigned to the Operator.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">other</span><span class="p">,</span> <span class="n">DAG</span><span class="p">):</span>
<span class="c1"># if this dag is already assigned, do nothing</span>
<span class="c1"># otherwise, do normal dag assignment</span>
<span class="k">if</span> <span class="ow">not</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">()</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="ow">is</span> <span class="n">other</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">other</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">set_downstream</span><span class="p">(</span><span class="n">other</span><span class="p">)</span>
<span class="k">return</span> <span class="n">other</span></div>
<div class="viewcode-block" id="BaseOperator.__lshift__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__lshift__">[docs]</a> <span class="k">def</span> <span class="nf">__lshift__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Implements Self &lt;&lt; Other == self.set_upstream(other)</span>
<span class="sd"> If &quot;Other&quot; is a DAG, the DAG is assigned to the Operator.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">other</span><span class="p">,</span> <span class="n">DAG</span><span class="p">):</span>
<span class="c1"># if this dag is already assigned, do nothing</span>
<span class="c1"># otherwise, do normal dag assignment</span>
<span class="k">if</span> <span class="ow">not</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">()</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="ow">is</span> <span class="n">other</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">other</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">set_upstream</span><span class="p">(</span><span class="n">other</span><span class="p">)</span>
<span class="k">return</span> <span class="n">other</span></div>
<div class="viewcode-block" id="BaseOperator.__rrshift__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__rrshift__">[docs]</a> <span class="k">def</span> <span class="nf">__rrshift__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Called for [DAG] &gt;&gt; [Operator] because DAGs don&#39;t have</span>
<span class="sd"> __rshift__ operators.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="fm">__lshift__</span><span class="p">(</span><span class="n">other</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span></div>
<div class="viewcode-block" id="BaseOperator.__rlshift__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__rlshift__">[docs]</a> <span class="k">def</span> <span class="nf">__rlshift__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Called for [DAG] &lt;&lt; [Operator] because DAGs don&#39;t have</span>
<span class="sd"> __lshift__ operators.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="fm">__rshift__</span><span class="p">(</span><span class="n">other</span><span class="p">)</span>
<span class="k">return</span> <span class="bp">self</span></div>
<span class="c1"># /Composing Operators ---------------------------------------------</span>
<span class="nd">@property</span>
<div class="viewcode-block" id="BaseOperator.dag"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.dag">[docs]</a> <span class="k">def</span> <span class="nf">dag</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns the Operator&#39;s DAG if set, otherwise raises an error</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">():</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dag</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
<span class="s1">&#39;Operator </span><span class="si">{}</span><span class="s1"> has not been assigned to a DAG yet&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div>
<span class="nd">@dag</span><span class="o">.</span><span class="n">setter</span>
<span class="k">def</span> <span class="nf">dag</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dag</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Operators can be assigned to one DAG, one time. Repeat assignments to</span>
<span class="sd"> that same DAG are ok.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">dag</span><span class="p">,</span> <span class="n">DAG</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span>
<span class="s1">&#39;Expected DAG; received </span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dag</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">))</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">()</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="ow">is</span> <span class="ow">not</span> <span class="n">dag</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
<span class="s2">&quot;The DAG assigned to </span><span class="si">{}</span><span class="s2"> can not be changed.&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">task_dict</span><span class="p">:</span>
<span class="n">dag</span><span class="o">.</span><span class="n">add_task</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_dag</span> <span class="o">=</span> <span class="n">dag</span>
<div class="viewcode-block" id="BaseOperator.has_dag"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.has_dag">[docs]</a> <span class="k">def</span> <span class="nf">has_dag</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns True if the Operator has been assigned to a DAG.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;_dag&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="BaseOperator.dag_id"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.dag_id">[docs]</a> <span class="k">def</span> <span class="nf">dag_id</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">():</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="s1">&#39;adhoc_&#39;</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">owner</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="BaseOperator.deps"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.deps">[docs]</a> <span class="k">def</span> <span class="nf">deps</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Returns the list of dependencies for the operator. These differ from execution</span>
<span class="sd"> context dependencies in that they are specific to tasks and can be</span>
<span class="sd"> extended/overridden by subclasses.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="p">{</span>
<span class="n">NotInRetryPeriodDep</span><span class="p">(),</span>
<span class="n">PrevDagrunDep</span><span class="p">(),</span>
<span class="n">TriggerRuleDep</span><span class="p">(),</span></div>
<span class="p">}</span>
<span class="nd">@property</span>
<div class="viewcode-block" id="BaseOperator.schedule_interval"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.schedule_interval">[docs]</a> <span class="k">def</span> <span class="nf">schedule_interval</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> The schedule interval of the DAG always wins over individual tasks so</span>
<span class="sd"> that tasks within a DAG always line up. The task still needs a</span>
<span class="sd"> schedule_interval as it may not be attached to a DAG.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">():</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">_schedule_interval</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="BaseOperator.priority_weight_total"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.priority_weight_total">[docs]</a> <span class="k">def</span> <span class="nf">priority_weight_total</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">weight_rule</span> <span class="o">==</span> <span class="n">WeightRule</span><span class="o">.</span><span class="n">ABSOLUTE</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">priority_weight</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">weight_rule</span> <span class="o">==</span> <span class="n">WeightRule</span><span class="o">.</span><span class="n">DOWNSTREAM</span><span class="p">:</span>
<span class="n">upstream</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">weight_rule</span> <span class="o">==</span> <span class="n">WeightRule</span><span class="o">.</span><span class="n">UPSTREAM</span><span class="p">:</span>
<span class="n">upstream</span> <span class="o">=</span> <span class="kc">True</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">upstream</span> <span class="o">=</span> <span class="kc">False</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">priority_weight</span> <span class="o">+</span> <span class="nb">sum</span><span class="p">(</span>
<span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">task_id</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dag</span><span class="o">.</span><span class="n">task_dict</span><span class="p">[</span><span class="n">task_id</span><span class="p">]</span><span class="o">.</span><span class="n">priority_weight</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">get_flat_relative_ids</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="n">upstream</span><span class="p">))</span></div>
<span class="p">)</span>
<span class="nd">@cached_property</span>
<div class="viewcode-block" id="BaseOperator.operator_extra_link_dict"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.operator_extra_link_dict">[docs]</a> <span class="k">def</span> <span class="nf">operator_extra_link_dict</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span><span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">:</span> <span class="n">link</span> <span class="k">for</span> <span class="n">link</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">operator_extra_links</span><span class="p">}</span></div>
<span class="nd">@cached_property</span>
<div class="viewcode-block" id="BaseOperator.global_operator_extra_link_dict"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.global_operator_extra_link_dict">[docs]</a> <span class="k">def</span> <span class="nf">global_operator_extra_link_dict</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="kn">from</span> <span class="nn">airflow.plugins_manager</span> <span class="k">import</span> <span class="n">global_operator_extra_links</span>
<span class="k">return</span> <span class="p">{</span><span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">:</span> <span class="n">link</span> <span class="k">for</span> <span class="n">link</span> <span class="ow">in</span> <span class="n">global_operator_extra_links</span><span class="p">}</span></div>
<span class="nd">@prepare_lineage</span>
<div class="viewcode-block" id="BaseOperator.pre_execute"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.pre_execute">[docs]</a> <span class="k">def</span> <span class="nf">pre_execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> This hook is triggered right before self.execute() is called.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span></div>
<div class="viewcode-block" id="BaseOperator.execute"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.execute">[docs]</a> <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> This is the main method to derive when creating an operator.</span>
<span class="sd"> Context is the same dictionary used as when rendering jinja templates.</span>
<span class="sd"> Refer to get_template_context for more context.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">()</span></div>
<span class="nd">@apply_lineage</span>
<div class="viewcode-block" id="BaseOperator.post_execute"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.post_execute">[docs]</a> <span class="k">def</span> <span class="nf">post_execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">result</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> This hook is triggered right after self.execute() is called.</span>
<span class="sd"> It is passed the execution context and any results returned by the</span>
<span class="sd"> operator.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span></div>
<div class="viewcode-block" id="BaseOperator.on_kill"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.on_kill">[docs]</a> <span class="k">def</span> <span class="nf">on_kill</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Override this method to cleanup subprocesses when a task instance</span>
<span class="sd"> gets killed. Any use of the threading, subprocess or multiprocessing</span>
<span class="sd"> module within an operator needs to be cleaned up or it will leave</span>
<span class="sd"> ghost processes behind.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span></div>
<div class="viewcode-block" id="BaseOperator.__deepcopy__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__deepcopy__">[docs]</a> <span class="k">def</span> <span class="nf">__deepcopy__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">memo</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Hack sorting double chained task lists by task_id to avoid hitting</span>
<span class="sd"> max_depth on deepcopy operations.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">sys</span><span class="o">.</span><span class="n">setrecursionlimit</span><span class="p">(</span><span class="mi">5000</span><span class="p">)</span> <span class="c1"># TODO fix this in a better way</span>
<span class="bp">cls</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span>
<span class="n">result</span> <span class="o">=</span> <span class="bp">cls</span><span class="o">.</span><span class="fm">__new__</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span>
<span class="n">memo</span><span class="p">[</span><span class="nb">id</span><span class="p">(</span><span class="bp">self</span><span class="p">)]</span> <span class="o">=</span> <span class="n">result</span>
<span class="n">shallow_copy</span> <span class="o">=</span> <span class="bp">cls</span><span class="o">.</span><span class="n">shallow_copy_attrs</span> <span class="o">+</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_base_operator_shallow_copy_attrs</span>
<span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">items</span><span class="p">()):</span>
<span class="k">if</span> <span class="n">k</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">shallow_copy</span><span class="p">:</span>
<span class="nb">setattr</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="n">k</span><span class="p">,</span> <span class="n">copy</span><span class="o">.</span><span class="n">deepcopy</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="n">memo</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="nb">setattr</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="n">k</span><span class="p">,</span> <span class="n">copy</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">v</span><span class="p">))</span>
<span class="k">return</span> <span class="n">result</span></div>
<div class="viewcode-block" id="BaseOperator.__getstate__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__getstate__">[docs]</a> <span class="k">def</span> <span class="nf">__getstate__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">state</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">)</span>
<span class="k">del</span> <span class="n">state</span><span class="p">[</span><span class="s1">&#39;_log&#39;</span><span class="p">]</span>
<span class="k">return</span> <span class="n">state</span></div>
<div class="viewcode-block" id="BaseOperator.__setstate__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__setstate__">[docs]</a> <span class="k">def</span> <span class="nf">__setstate__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span> <span class="o">=</span> <span class="n">state</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_log</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s2">&quot;airflow.task.operators&quot;</span><span class="p">)</span></div>
<div class="viewcode-block" id="BaseOperator.render_template_from_field"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.render_template_from_field">[docs]</a> <span class="k">def</span> <span class="nf">render_template_from_field</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">attr</span><span class="p">,</span> <span class="n">content</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">jinja_env</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Renders a template from a field. If the field is a string, it will</span>
<span class="sd"> simply render the string and return the result. If it is a collection or</span>
<span class="sd"> nested set of collections, it will traverse the structure and render</span>
<span class="sd"> all elements in it. If the field has another type, it will return it as it is.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">rt</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">render_template</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">):</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">jinja_env</span><span class="o">.</span><span class="n">from_string</span><span class="p">(</span><span class="n">content</span><span class="p">)</span><span class="o">.</span><span class="n">render</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">):</span>
<span class="k">if</span> <span class="nb">type</span><span class="p">(</span><span class="n">content</span><span class="p">)</span> <span class="ow">is</span> <span class="ow">not</span> <span class="nb">tuple</span><span class="p">:</span>
<span class="c1"># Special case for named tuples</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">content</span><span class="o">.</span><span class="vm">__class__</span><span class="p">(</span><span class="o">*</span><span class="p">(</span><span class="n">rt</span><span class="p">(</span><span class="n">attr</span><span class="p">,</span> <span class="n">e</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">content</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">result</span> <span class="o">=</span> <span class="nb">tuple</span><span class="p">(</span><span class="n">rt</span><span class="p">(</span><span class="n">attr</span><span class="p">,</span> <span class="n">e</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">content</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="n">result</span> <span class="o">=</span> <span class="p">[</span><span class="n">rt</span><span class="p">(</span><span class="n">attr</span><span class="p">,</span> <span class="n">e</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">content</span><span class="p">]</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span>
<span class="n">result</span> <span class="o">=</span> <span class="p">{</span>
<span class="n">k</span><span class="p">:</span> <span class="n">rt</span><span class="p">(</span><span class="s2">&quot;</span><span class="si">{}</span><span class="s2">[</span><span class="si">{}</span><span class="s2">]&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">attr</span><span class="p">,</span> <span class="n">k</span><span class="p">),</span> <span class="n">v</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span>
<span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">content</span><span class="o">.</span><span class="n">items</span><span class="p">())}</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">result</span> <span class="o">=</span> <span class="n">content</span>
<span class="k">return</span> <span class="n">result</span></div>
<div class="viewcode-block" id="BaseOperator.render_template"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.render_template">[docs]</a> <span class="k">def</span> <span class="nf">render_template</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">attr</span><span class="p">,</span> <span class="n">content</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Renders a template either from a file or directly in a field, and returns</span>
<span class="sd"> the rendered result.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">jinja_env</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_template_env</span><span class="p">()</span>
<span class="n">exts</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="n">template_ext</span>
<span class="k">if</span> <span class="p">(</span>
<span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">)</span> <span class="ow">and</span>
<span class="nb">any</span><span class="p">([</span><span class="n">content</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="n">ext</span><span class="p">)</span> <span class="k">for</span> <span class="n">ext</span> <span class="ow">in</span> <span class="n">exts</span><span class="p">])):</span>
<span class="k">return</span> <span class="n">jinja_env</span><span class="o">.</span><span class="n">get_template</span><span class="p">(</span><span class="n">content</span><span class="p">)</span><span class="o">.</span><span class="n">render</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">render_template_from_field</span><span class="p">(</span><span class="n">attr</span><span class="p">,</span> <span class="n">content</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">jinja_env</span><span class="p">)</span></div>
<div class="viewcode-block" id="BaseOperator.get_template_env"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_template_env">[docs]</a> <span class="k">def</span> <span class="nf">get_template_env</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">get_template_env</span><span class="p">()</span> \
<span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">&#39;dag&#39;</span><span class="p">)</span> \
<span class="k">else</span> <span class="n">jinja2</span><span class="o">.</span><span class="n">Environment</span><span class="p">(</span><span class="n">cache_size</span><span class="o">=</span><span class="mi">0</span><span class="p">)</span></div>
<div class="viewcode-block" id="BaseOperator.prepare_template"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.prepare_template">[docs]</a> <span class="k">def</span> <span class="nf">prepare_template</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Hook that is triggered after the templated fields get replaced</span>
<span class="sd"> by their content. If you need your operator to alter the</span>
<span class="sd"> content of the file before the template is rendered,</span>
<span class="sd"> it should override this method to do so.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span></div>
<div class="viewcode-block" id="BaseOperator.resolve_template_files"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.resolve_template_files">[docs]</a> <span class="k">def</span> <span class="nf">resolve_template_files</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># Getting the content of files for template_field / template_ext</span>
<span class="k">for</span> <span class="n">attr</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">template_fields</span><span class="p">:</span>
<span class="n">content</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">attr</span><span class="p">)</span>
<span class="k">if</span> <span class="n">content</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">continue</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">)</span> <span class="ow">and</span> \
<span class="nb">any</span><span class="p">([</span><span class="n">content</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="n">ext</span><span class="p">)</span> <span class="k">for</span> <span class="n">ext</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">template_ext</span><span class="p">]):</span>
<span class="n">env</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_template_env</span><span class="p">()</span>
<span class="k">try</span><span class="p">:</span>
<span class="nb">setattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">attr</span><span class="p">,</span> <span class="n">env</span><span class="o">.</span><span class="n">loader</span><span class="o">.</span><span class="n">get_source</span><span class="p">(</span><span class="n">env</span><span class="p">,</span> <span class="n">content</span><span class="p">)[</span><span class="mi">0</span><span class="p">])</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
<span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span>
<span class="n">env</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">get_template_env</span><span class="p">()</span>
<span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="nb">len</span><span class="p">(</span><span class="n">content</span><span class="p">)):</span>
<span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">[</span><span class="n">i</span><span class="p">],</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">)</span> <span class="ow">and</span> \
<span class="nb">any</span><span class="p">([</span><span class="n">content</span><span class="p">[</span><span class="n">i</span><span class="p">]</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="n">ext</span><span class="p">)</span> <span class="k">for</span> <span class="n">ext</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">template_ext</span><span class="p">]):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">content</span><span class="p">[</span><span class="n">i</span><span class="p">]</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">loader</span><span class="o">.</span><span class="n">get_source</span><span class="p">(</span><span class="n">env</span><span class="p">,</span> <span class="n">content</span><span class="p">[</span><span class="n">i</span><span class="p">])[</span><span class="mi">0</span><span class="p">]</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">prepare_template</span><span class="p">()</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="BaseOperator.upstream_list"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.upstream_list">[docs]</a> <span class="k">def</span> <span class="nf">upstream_list</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;@property: list of tasks directly upstream&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">tid</span><span class="p">)</span> <span class="k">for</span> <span class="n">tid</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_upstream_task_ids</span><span class="p">]</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="BaseOperator.upstream_task_ids"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.upstream_task_ids">[docs]</a> <span class="k">def</span> <span class="nf">upstream_task_ids</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_upstream_task_ids</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="BaseOperator.downstream_list"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.downstream_list">[docs]</a> <span class="k">def</span> <span class="nf">downstream_list</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;@property: list of tasks directly downstream&quot;&quot;&quot;</span>
<span class="k">return</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">tid</span><span class="p">)</span> <span class="k">for</span> <span class="n">tid</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_downstream_task_ids</span><span class="p">]</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="BaseOperator.downstream_task_ids"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.downstream_task_ids">[docs]</a> <span class="k">def</span> <span class="nf">downstream_task_ids</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_downstream_task_ids</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="BaseOperator.clear"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.clear">[docs]</a> <span class="k">def</span> <span class="nf">clear</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span>
<span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">downstream</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Clears the state of task instances associated with the task, following</span>
<span class="sd"> the parameters specified.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span>
<span class="n">qry</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>
<span class="k">if</span> <span class="n">start_date</span><span class="p">:</span>
<span class="n">qry</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">&gt;=</span> <span class="n">start_date</span><span class="p">)</span>
<span class="k">if</span> <span class="n">end_date</span><span class="p">:</span>
<span class="n">qry</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">&lt;=</span> <span class="n">end_date</span><span class="p">)</span>
<span class="n">tasks</span> <span class="o">=</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">]</span>
<span class="k">if</span> <span class="n">upstream</span><span class="p">:</span>
<span class="n">tasks</span> <span class="o">+=</span> <span class="p">[</span>
<span class="n">t</span><span class="o">.</span><span class="n">task_id</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_flat_relatives</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">True</span><span class="p">)]</span>
<span class="k">if</span> <span class="n">downstream</span><span class="p">:</span>
<span class="n">tasks</span> <span class="o">+=</span> <span class="p">[</span>
<span class="n">t</span><span class="o">.</span><span class="n">task_id</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_flat_relatives</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">)]</span>
<span class="n">qry</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">task_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">tasks</span><span class="p">))</span>
<span class="n">count</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">count</span><span class="p">()</span>
<span class="n">clear_task_instances</span><span class="p">(</span><span class="n">qry</span><span class="o">.</span><span class="n">all</span><span class="p">(),</span> <span class="n">session</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span>
<span class="k">return</span> <span class="n">count</span></div>
<span class="nd">@provide_session</span>
<div class="viewcode-block" id="BaseOperator.get_task_instances"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_task_instances">[docs]</a> <span class="k">def</span> <span class="nf">get_task_instances</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Get a set of task instance related to this task for a specific date</span>
<span class="sd"> range.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">end_date</span> <span class="o">=</span> <span class="n">end_date</span> <span class="ow">or</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="k">return</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TaskInstance</span><span class="p">)</span>\
<span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>\
<span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span>\
<span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">&gt;=</span> <span class="n">start_date</span><span class="p">)</span>\
<span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">&lt;=</span> <span class="n">end_date</span><span class="p">)</span>\
<span class="o">.</span><span class="n">order_by</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>\</div>
<span class="o">.</span><span class="n">all</span><span class="p">()</span>
<div class="viewcode-block" id="BaseOperator.get_flat_relative_ids"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_flat_relative_ids">[docs]</a> <span class="k">def</span> <span class="nf">get_flat_relative_ids</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">found_descendants</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Get a flat list of relatives&#39; ids, either upstream or downstream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="ow">not</span> <span class="n">found_descendants</span><span class="p">:</span>
<span class="n">found_descendants</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span>
<span class="n">relative_ids</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_direct_relative_ids</span><span class="p">(</span><span class="n">upstream</span><span class="p">)</span>
<span class="k">for</span> <span class="n">relative_id</span> <span class="ow">in</span> <span class="n">relative_ids</span><span class="p">:</span>
<span class="k">if</span> <span class="n">relative_id</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">found_descendants</span><span class="p">:</span>
<span class="n">found_descendants</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">relative_id</span><span class="p">)</span>
<span class="n">relative_task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dag</span><span class="o">.</span><span class="n">task_dict</span><span class="p">[</span><span class="n">relative_id</span><span class="p">]</span>
<span class="n">relative_task</span><span class="o">.</span><span class="n">get_flat_relative_ids</span><span class="p">(</span><span class="n">upstream</span><span class="p">,</span>
<span class="n">found_descendants</span><span class="p">)</span>
<span class="k">return</span> <span class="n">found_descendants</span></div>
<div class="viewcode-block" id="BaseOperator.get_flat_relatives"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_flat_relatives">[docs]</a> <span class="k">def</span> <span class="nf">get_flat_relatives</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Get a flat list of relatives, either upstream or downstream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="nb">list</span><span class="p">(</span><span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">task_id</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dag</span><span class="o">.</span><span class="n">task_dict</span><span class="p">[</span><span class="n">task_id</span><span class="p">],</span>
<span class="bp">self</span><span class="o">.</span><span class="n">get_flat_relative_ids</span><span class="p">(</span><span class="n">upstream</span><span class="p">)))</span></div>
<div class="viewcode-block" id="BaseOperator.run"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.run">[docs]</a> <span class="k">def</span> <span class="nf">run</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">ignore_first_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">ignore_ti_state</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span>
<span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Run a set of task instances for a date range.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">start_date</span> <span class="o">=</span> <span class="n">start_date</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span>
<span class="n">end_date</span> <span class="o">=</span> <span class="n">end_date</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="ow">or</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span>
<span class="k">for</span> <span class="n">dt</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">date_range</span><span class="p">(</span><span class="n">start_date</span><span class="p">,</span> <span class="n">end_date</span><span class="o">=</span><span class="n">end_date</span><span class="p">):</span>
<span class="n">TaskInstance</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dt</span><span class="p">)</span><span class="o">.</span><span class="n">run</span><span class="p">(</span>
<span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span>
<span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="p">(</span>
<span class="n">dt</span> <span class="o">==</span> <span class="n">start_date</span> <span class="ow">and</span> <span class="n">ignore_first_depends_on_past</span><span class="p">),</span>
<span class="n">ignore_ti_state</span><span class="o">=</span><span class="n">ignore_ti_state</span><span class="p">)</span></div>
<div class="viewcode-block" id="BaseOperator.dry_run"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.dry_run">[docs]</a> <span class="k">def</span> <span class="nf">dry_run</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Dry run&#39;</span><span class="p">)</span>
<span class="k">for</span> <span class="n">attr</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">template_fields</span><span class="p">:</span>
<span class="n">content</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">attr</span><span class="p">)</span>
<span class="k">if</span> <span class="n">content</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Rendering template for </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">attr</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="n">content</span><span class="p">)</span></div>
<div class="viewcode-block" id="BaseOperator.get_direct_relative_ids"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_direct_relative_ids">[docs]</a> <span class="k">def</span> <span class="nf">get_direct_relative_ids</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Get the direct relative ids to the current task, upstream or</span>
<span class="sd"> downstream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">upstream</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_upstream_task_ids</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_downstream_task_ids</span></div>
<div class="viewcode-block" id="BaseOperator.get_direct_relatives"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_direct_relatives">[docs]</a> <span class="k">def</span> <span class="nf">get_direct_relatives</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Get the direct relatives to the current task, upstream or</span>
<span class="sd"> downstream.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">upstream</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">upstream_list</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">downstream_list</span></div>
<div class="viewcode-block" id="BaseOperator.__repr__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__repr__">[docs]</a> <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s2">&quot;&lt;Task(</span><span class="si">{self.__class__.__name__}</span><span class="s2">): </span><span class="si">{self.task_id}</span><span class="s2">&gt;&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="bp">self</span><span class="o">=</span><span class="bp">self</span><span class="p">)</span></div>
<span class="nd">@property</span>
<div class="viewcode-block" id="BaseOperator.task_type"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.task_type">[docs]</a> <span class="k">def</span> <span class="nf">task_type</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span></div>
<div class="viewcode-block" id="BaseOperator.add_only_new"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.add_only_new">[docs]</a> <span class="k">def</span> <span class="nf">add_only_new</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">item_set</span><span class="p">,</span> <span class="n">item</span><span class="p">):</span>
<span class="k">if</span> <span class="n">item</span> <span class="ow">in</span> <span class="n">item_set</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span>
<span class="s1">&#39;Dependency </span><span class="si">{self}</span><span class="s1">, </span><span class="si">{item}</span><span class="s1"> already registered&#39;</span>
<span class="s1">&#39;&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">=</span><span class="bp">self</span><span class="p">,</span> <span class="n">item</span><span class="o">=</span><span class="n">item</span><span class="p">))</span>
<span class="k">else</span><span class="p">:</span>
<span class="n">item_set</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">item</span><span class="p">)</span></div>
<div class="viewcode-block" id="BaseOperator._set_relatives"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator._set_relatives">[docs]</a> <span class="k">def</span> <span class="nf">_set_relatives</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task_or_task_list</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">task_list</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">task_or_task_list</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span>
<span class="n">task_list</span> <span class="o">=</span> <span class="p">[</span><span class="n">task_or_task_list</span><span class="p">]</span>
<span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">task_list</span><span class="p">:</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">t</span><span class="p">,</span> <span class="n">BaseOperator</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
<span class="s2">&quot;Relationships can only be set between &quot;</span>
<span class="s2">&quot;Operators; received </span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">t</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">))</span>
<span class="c1"># relationships can only be set if the tasks share a single DAG. Tasks</span>
<span class="c1"># without a DAG are assigned to that DAG.</span>
<span class="n">dags</span> <span class="o">=</span> <span class="p">{</span><span class="n">t</span><span class="o">.</span><span class="n">_dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">:</span> <span class="n">t</span><span class="o">.</span><span class="n">_dag</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="p">[</span><span class="bp">self</span><span class="p">]</span> <span class="o">+</span> <span class="n">task_list</span> <span class="k">if</span> <span class="n">t</span><span class="o">.</span><span class="n">has_dag</span><span class="p">()}</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">dags</span><span class="p">)</span> <span class="o">&gt;</span> <span class="mi">1</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
<span class="s1">&#39;Tried to set relationships between tasks in &#39;</span>
<span class="s1">&#39;more than one DAG: </span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dags</span><span class="o">.</span><span class="n">values</span><span class="p">()))</span>
<span class="k">elif</span> <span class="nb">len</span><span class="p">(</span><span class="n">dags</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span>
<span class="n">dag</span> <span class="o">=</span> <span class="n">dags</span><span class="o">.</span><span class="n">popitem</span><span class="p">()[</span><span class="mi">1</span><span class="p">]</span>
<span class="k">else</span><span class="p">:</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
<span class="s2">&quot;Tried to create relationships between tasks that don&#39;t have &quot;</span>
<span class="s2">&quot;DAGs yet. Set the DAG for at least one &quot;</span>
<span class="s2">&quot;task and try again: </span><span class="si">{}</span><span class="s2">&quot;</span><span class="o">.</span><span class="n">format</span><span class="p">([</span><span class="bp">self</span><span class="p">]</span> <span class="o">+</span> <span class="n">task_list</span><span class="p">))</span>
<span class="k">if</span> <span class="n">dag</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">():</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">dag</span>
<span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">task_list</span><span class="p">:</span>
<span class="k">if</span> <span class="n">dag</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">task</span><span class="o">.</span><span class="n">has_dag</span><span class="p">():</span>
<span class="n">task</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">dag</span>
<span class="k">if</span> <span class="n">upstream</span><span class="p">:</span>
<span class="n">task</span><span class="o">.</span><span class="n">add_only_new</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">get_direct_relative_ids</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">add_only_new</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_upstream_task_ids</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">add_only_new</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_downstream_task_ids</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span>
<span class="n">task</span><span class="o">.</span><span class="n">add_only_new</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">get_direct_relative_ids</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">True</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span></div>
<div class="viewcode-block" id="BaseOperator.set_downstream"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.set_downstream">[docs]</a> <span class="k">def</span> <span class="nf">set_downstream</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task_or_task_list</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Set a task or a task list to be directly downstream from the current</span>
<span class="sd"> task.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_set_relatives</span><span class="p">(</span><span class="n">task_or_task_list</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div>
<div class="viewcode-block" id="BaseOperator.set_upstream"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.set_upstream">[docs]</a> <span class="k">def</span> <span class="nf">set_upstream</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task_or_task_list</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Set a task or a task list to be directly upstream from the current</span>
<span class="sd"> task.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_set_relatives</span><span class="p">(</span><span class="n">task_or_task_list</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div>
<div class="viewcode-block" id="BaseOperator.xcom_push"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.xcom_push">[docs]</a> <span class="k">def</span> <span class="nf">xcom_push</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">context</span><span class="p">,</span>
<span class="n">key</span><span class="p">,</span>
<span class="n">value</span><span class="p">,</span>
<span class="n">execution_date</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> See TaskInstance.xcom_push()</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">context</span><span class="p">[</span><span class="s1">&#39;ti&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">(</span>
<span class="n">key</span><span class="o">=</span><span class="n">key</span><span class="p">,</span>
<span class="n">value</span><span class="o">=</span><span class="n">value</span><span class="p">,</span>
<span class="n">execution_date</span><span class="o">=</span><span class="n">execution_date</span><span class="p">)</span></div>
<div class="viewcode-block" id="BaseOperator.xcom_pull"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.xcom_pull">[docs]</a> <span class="k">def</span> <span class="nf">xcom_pull</span><span class="p">(</span>
<span class="bp">self</span><span class="p">,</span>
<span class="n">context</span><span class="p">,</span>
<span class="n">task_ids</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">dag_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">key</span><span class="o">=</span><span class="n">XCOM_RETURN_KEY</span><span class="p">,</span>
<span class="n">include_prior_dates</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> See TaskInstance.xcom_pull()</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">context</span><span class="p">[</span><span class="s1">&#39;ti&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_pull</span><span class="p">(</span>
<span class="n">key</span><span class="o">=</span><span class="n">key</span><span class="p">,</span>
<span class="n">task_ids</span><span class="o">=</span><span class="n">task_ids</span><span class="p">,</span>
<span class="n">dag_id</span><span class="o">=</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">include_prior_dates</span><span class="o">=</span><span class="n">include_prior_dates</span><span class="p">)</span></div>
<span class="nd">@cached_property</span>
<div class="viewcode-block" id="BaseOperator.extra_links"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.extra_links">[docs]</a> <span class="k">def</span> <span class="nf">extra_links</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; Iterable[str]</span>
<span class="k">return</span> <span class="nb">list</span><span class="p">(</span><span class="nb">set</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">operator_extra_link_dict</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span>
<span class="o">.</span><span class="n">union</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">global_operator_extra_link_dict</span><span class="o">.</span><span class="n">keys</span><span class="p">()))</span></div>
<div class="viewcode-block" id="BaseOperator.get_extra_links"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_extra_links">[docs]</a> <span class="k">def</span> <span class="nf">get_extra_links</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dttm</span><span class="p">,</span> <span class="n">link_name</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> For an operator, gets the URL that the external links specified in</span>
<span class="sd"> `extra_links` should point to.</span>
<span class="sd"> :raise ValueError: The error message of a ValueError will be passed on through to</span>
<span class="sd"> the fronted to show up as a tooltip on the disabled link</span>
<span class="sd"> :param dttm: The datetime parsed execution date for the URL being searched for</span>
<span class="sd"> :param link_name: The name of the link we&#39;re looking for the URL for. Should be</span>
<span class="sd"> one of the options specified in `extra_links`</span>
<span class="sd"> :return: A URL</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">if</span> <span class="n">link_name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">operator_extra_link_dict</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">operator_extra_link_dict</span><span class="p">[</span><span class="n">link_name</span><span class="p">]</span><span class="o">.</span><span class="n">get_link</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dttm</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">link_name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">global_operator_extra_link_dict</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">global_operator_extra_link_dict</span><span class="p">[</span><span class="n">link_name</span><span class="p">]</span><span class="o">.</span><span class="n">get_link</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dttm</span><span class="p">)</span></div></div>
<div class="viewcode-block" id="BaseOperatorLink"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperatorLink">[docs]</a><span class="k">class</span> <span class="nc">BaseOperatorLink</span><span class="p">:</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Abstract base class that defines how we get an operator link.</span>
<span class="sd"> &quot;&quot;&quot;</span>
<div class="viewcode-block" id="BaseOperatorLink.__metaclass__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperatorLink.__metaclass__">[docs]</a> <span class="n">__metaclass__</span> <span class="o">=</span> <span class="n">ABCMeta</span></div>
<span class="nd">@property</span>
<span class="nd">@abstractmethod</span>
<div class="viewcode-block" id="BaseOperatorLink.name"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperatorLink.name">[docs]</a> <span class="k">def</span> <span class="nf">name</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># type: () -&gt; str</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Name of the link. This will be the button name on the task UI.</span>
<span class="sd"> :return: link name</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span></div>
<span class="nd">@abstractmethod</span>
<div class="viewcode-block" id="BaseOperatorLink.get_link"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperatorLink.get_link">[docs]</a> <span class="k">def</span> <span class="nf">get_link</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">operator</span><span class="p">,</span> <span class="n">dttm</span><span class="p">):</span>
<span class="c1"># type: (BaseOperator, datetime) -&gt; str</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Link to external system.</span>
<span class="sd"> :param operator: airflow operator</span>
<span class="sd"> :param dttm: datetime</span>
<span class="sd"> :return: link to external system</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">pass</span></div></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
<div class="footer">This page uses <a href="https://analytics.google.com/">
Google Analytics</a> to collect statistics. You can disable it by blocking
the JavaScript coming from www.google-analytics.com. Check our
<a href="../../../privacy_notice.html">Privacy Policy</a>
for more details.
<script type="text/javascript">
(function() {
var ga = document.createElement('script');
ga.src = ('https:' == document.location.protocol ?
'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
ga.setAttribute('async', 'true');
var nodes = document.documentElement.childNodes;
var i = -1;
var node;
do {
i++;
node = nodes[i]
} while(node.nodeType !== Node.ELEMENT_NODE);
node.appendChild(ga);
})();
</script>
</div>
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>