| |
| |
| |
| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>airflow.models.baseoperator — Airflow Documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../" src="../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../_static/language_data.js"></script> |
| |
| <script type="text/javascript" src="../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../search.html" /> |
| |
| <script> |
| document.addEventListener('DOMContentLoaded', function() { |
| var el = document.getElementById('changelog'); |
| if (el !== null ) { |
| // [AIRFLOW-...] |
| el.innerHTML = el.innerHTML.replace( |
| /\[(AIRFLOW-[\d]+)\]/g, |
| `<a href="https://issues.apache.org/jira/browse/$1">[$1]</a>` |
| ); |
| // (#...) |
| el.innerHTML = el.innerHTML.replace( |
| /\(#([\d]+)\)/g, |
| `<a href="https://github.com/apache/airflow/pull/$1">(#$1)</a>` |
| ); |
| }; |
| }) |
| </script> |
| <script type="text/javascript"> |
| var _gaq = _gaq || []; |
| _gaq.push(['_setAccount', 'UA-140539454-1']); |
| _gaq.push(['_trackPageview']); |
| </script> |
| <style> |
| .example-header { |
| position: relative; |
| background: #9AAA7A; |
| padding: 8px 16px; |
| margin-bottom: 0; |
| } |
| .example-header--with-button { |
| padding-right: 166px; |
| } |
| .example-header:after{ |
| content: ''; |
| display: table; |
| clear: both; |
| } |
| .example-title { |
| display:block; |
| padding: 4px; |
| margin-right: 16px; |
| color: white; |
| overflow-x: auto; |
| } |
| .example-header-button { |
| top: 8px; |
| right: 16px; |
| position: absolute; |
| } |
| .example-header + .highlight-python { |
| margin-top: 0 !important; |
| } |
| .viewcode-button { |
| display: inline-block; |
| padding: 8px 16px; |
| border: 0; |
| margin: 0; |
| outline: 0; |
| border-radius: 2px; |
| -webkit-box-shadow: 0 3px 5px 0 rgba(0,0,0,.3); |
| box-shadow: 0 3px 6px 0 rgba(0,0,0,.3); |
| color: #404040; |
| background-color: #e7e7e7; |
| cursor: pointer; |
| font-size: 16px; |
| font-weight: 500; |
| line-height: 1; |
| text-decoration: none; |
| text-overflow: ellipsis; |
| overflow: hidden; |
| text-transform: uppercase; |
| -webkit-transition: background-color .2s; |
| transition: background-color .2s; |
| vertical-align: middle; |
| white-space: nowrap; |
| } |
| .viewcode-button:visited { |
| color: #404040; |
| } |
| .viewcode-button:hover, .viewcode-button:focus { |
| color: #404040; |
| background-color: #d6d6d6; |
| } |
| </style> |
| |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../index.html" class="icon icon-home"> Airflow |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 1.10.5 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../project.html">Project</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../license.html">License</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../start.html">Quick Start</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../installation.html">Installation</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../tutorial.html">Tutorial</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../howto/index.html">How-to Guides</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../ui.html">UI / Screenshots</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../concepts.html">Concepts</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../profiling.html">Data Profiling</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../cli.html">Command Line Interface Reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../scheduler.html">Scheduling & Triggers</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../plugins.html">Plugins</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../security.html">Security</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../timezone.html">Time zones</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../api.html">REST API Reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../integration.html">Integration</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../metrics.html">Metrics</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../kubernetes.html">Kubernetes</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../lineage.html">Lineage</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../changelog.html">Changelog</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../faq.html">FAQ</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../macros.html">Macros reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../_api/index.html">Python API Reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../privacy_notice.html">Privacy Notice</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../index.html">Airflow</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../index.html">Module code</a> »</li> |
| |
| <li><a href="../models.html">airflow.models</a> »</li> |
| |
| <li>airflow.models.baseoperator</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for airflow.models.baseoperator</h1><div class="highlight"><pre> |
| <span></span><span class="c1"># -*- coding: utf-8 -*-</span> |
| <span class="c1">#</span> |
| <span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span> |
| <span class="c1"># or more contributor license agreements. See the NOTICE file</span> |
| <span class="c1"># distributed with this work for additional information</span> |
| <span class="c1"># regarding copyright ownership. The ASF licenses this file</span> |
| <span class="c1"># to you under the Apache License, Version 2.0 (the</span> |
| <span class="c1"># "License"); you may not use this file except in compliance</span> |
| <span class="c1"># with the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing,</span> |
| <span class="c1"># software distributed under the License is distributed on an</span> |
| <span class="c1"># "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span> |
| <span class="c1"># KIND, either express or implied. See the License for the</span> |
| <span class="c1"># specific language governing permissions and limitations</span> |
| <span class="c1"># under the License.</span> |
| |
| <span class="kn">from</span> <span class="nn">abc</span> <span class="k">import</span> <span class="n">ABCMeta</span><span class="p">,</span> <span class="n">abstractmethod</span> |
| <span class="kn">from</span> <span class="nn">cached_property</span> <span class="k">import</span> <span class="n">cached_property</span> |
| <span class="kn">import</span> <span class="nn">copy</span> |
| <span class="kn">import</span> <span class="nn">functools</span> |
| <span class="kn">import</span> <span class="nn">logging</span> |
| <span class="kn">import</span> <span class="nn">sys</span> |
| <span class="kn">import</span> <span class="nn">warnings</span> |
| <span class="kn">from</span> <span class="nn">datetime</span> <span class="k">import</span> <span class="n">timedelta</span><span class="p">,</span> <span class="n">datetime</span> |
| <span class="kn">from</span> <span class="nn">typing</span> <span class="k">import</span> <span class="n">Iterable</span><span class="p">,</span> <span class="n">Optional</span><span class="p">,</span> <span class="n">Dict</span><span class="p">,</span> <span class="n">Callable</span><span class="p">,</span> <span class="n">Set</span> |
| |
| <span class="kn">import</span> <span class="nn">jinja2</span> |
| <span class="kn">import</span> <span class="nn">six</span> |
| |
| <span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">configuration</span><span class="p">,</span> <span class="n">settings</span> |
| <span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowException</span> |
| <span class="kn">from</span> <span class="nn">airflow.lineage</span> <span class="k">import</span> <span class="n">prepare_lineage</span><span class="p">,</span> <span class="n">apply_lineage</span><span class="p">,</span> <span class="n">DataSet</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.dag</span> <span class="k">import</span> <span class="n">DAG</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.pool</span> <span class="k">import</span> <span class="n">Pool</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.taskinstance</span> <span class="k">import</span> <span class="n">TaskInstance</span><span class="p">,</span> <span class="n">clear_task_instances</span> |
| <span class="kn">from</span> <span class="nn">airflow.models.xcom</span> <span class="k">import</span> <span class="n">XCOM_RETURN_KEY</span> |
| <span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.not_in_retry_period_dep</span> <span class="k">import</span> <span class="n">NotInRetryPeriodDep</span> |
| <span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.prev_dagrun_dep</span> <span class="k">import</span> <span class="n">PrevDagrunDep</span> |
| <span class="kn">from</span> <span class="nn">airflow.ti_deps.deps.trigger_rule_dep</span> <span class="k">import</span> <span class="n">TriggerRuleDep</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils</span> <span class="k">import</span> <span class="n">timezone</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.db</span> <span class="k">import</span> <span class="n">provide_session</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.decorators</span> <span class="k">import</span> <span class="n">apply_defaults</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.helpers</span> <span class="k">import</span> <span class="n">validate_key</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.log.logging_mixin</span> <span class="k">import</span> <span class="n">LoggingMixin</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.operator_resources</span> <span class="k">import</span> <span class="n">Resources</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.trigger_rule</span> <span class="k">import</span> <span class="n">TriggerRule</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.weight_rule</span> <span class="k">import</span> <span class="n">WeightRule</span> |
| |
| |
| <div class="viewcode-block" id="BaseOperator"><a class="viewcode-back" href="../../../_api/airflow/models/index.html#airflow.models.baseoperator.BaseOperator">[docs]</a><span class="nd">@functools</span><span class="o">.</span><span class="n">total_ordering</span> |
| <span class="k">class</span> <span class="nc">BaseOperator</span><span class="p">(</span><span class="n">LoggingMixin</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Abstract base class for all operators. Since operators create objects that</span> |
| <span class="sd"> become nodes in the dag, BaseOperator contains many recursive methods for</span> |
| <span class="sd"> dag crawling behavior. To derive this class, you are expected to override</span> |
| <span class="sd"> the constructor as well as the 'execute' method.</span> |
| |
| <span class="sd"> Operators derived from this class should perform or trigger certain tasks</span> |
| <span class="sd"> synchronously (wait for completion). Example of operators could be an</span> |
| <span class="sd"> operator that runs a Pig job (PigOperator), a sensor operator that</span> |
| <span class="sd"> waits for a partition to land in Hive (HiveSensorOperator), or one that</span> |
| <span class="sd"> moves data from Hive to MySQL (Hive2MySqlOperator). Instances of these</span> |
| <span class="sd"> operators (tasks) target specific operations, running specific scripts,</span> |
| <span class="sd"> functions or data transfers.</span> |
| |
| <span class="sd"> This class is abstract and shouldn't be instantiated. Instantiating a</span> |
| <span class="sd"> class derived from this one results in the creation of a task object,</span> |
| <span class="sd"> which ultimately becomes a node in DAG objects. Task dependencies should</span> |
| <span class="sd"> be set by using the set_upstream and/or set_downstream methods.</span> |
| |
| <span class="sd"> :param task_id: a unique, meaningful id for the task</span> |
| <span class="sd"> :type task_id: str</span> |
| <span class="sd"> :param owner: the owner of the task, using the unix username is recommended</span> |
| <span class="sd"> :type owner: str</span> |
| <span class="sd"> :param retries: the number of retries that should be performed before</span> |
| <span class="sd"> failing the task</span> |
| <span class="sd"> :type retries: int</span> |
| <span class="sd"> :param retry_delay: delay between retries</span> |
| <span class="sd"> :type retry_delay: datetime.timedelta</span> |
| <span class="sd"> :param retry_exponential_backoff: allow progressive longer waits between</span> |
| <span class="sd"> retries by using exponential backoff algorithm on retry delay (delay</span> |
| <span class="sd"> will be converted into seconds)</span> |
| <span class="sd"> :type retry_exponential_backoff: bool</span> |
| <span class="sd"> :param max_retry_delay: maximum delay interval between retries</span> |
| <span class="sd"> :type max_retry_delay: datetime.timedelta</span> |
| <span class="sd"> :param start_date: The ``start_date`` for the task, determines</span> |
| <span class="sd"> the ``execution_date`` for the first task instance. The best practice</span> |
| <span class="sd"> is to have the start_date rounded</span> |
| <span class="sd"> to your DAG's ``schedule_interval``. Daily jobs have their start_date</span> |
| <span class="sd"> some day at 00:00:00, hourly jobs have their start_date at 00:00</span> |
| <span class="sd"> of a specific hour. Note that Airflow simply looks at the latest</span> |
| <span class="sd"> ``execution_date`` and adds the ``schedule_interval`` to determine</span> |
| <span class="sd"> the next ``execution_date``. It is also very important</span> |
| <span class="sd"> to note that different tasks' dependencies</span> |
| <span class="sd"> need to line up in time. If task A depends on task B and their</span> |
| <span class="sd"> start_date are offset in a way that their execution_date don't line</span> |
| <span class="sd"> up, A's dependencies will never be met. If you are looking to delay</span> |
| <span class="sd"> a task, for example running a daily task at 2AM, look into the</span> |
| <span class="sd"> ``TimeSensor`` and ``TimeDeltaSensor``. We advise against using</span> |
| <span class="sd"> dynamic ``start_date`` and recommend using fixed ones. Read the</span> |
| <span class="sd"> FAQ entry about start_date for more information.</span> |
| <span class="sd"> :type start_date: datetime.datetime</span> |
| <span class="sd"> :param end_date: if specified, the scheduler won't go beyond this date</span> |
| <span class="sd"> :type end_date: datetime.datetime</span> |
| <span class="sd"> :param depends_on_past: when set to true, task instances will run</span> |
| <span class="sd"> sequentially while relying on the previous task's schedule to</span> |
| <span class="sd"> succeed. The task instance for the start_date is allowed to run.</span> |
| <span class="sd"> :type depends_on_past: bool</span> |
| <span class="sd"> :param wait_for_downstream: when set to true, an instance of task</span> |
| <span class="sd"> X will wait for tasks immediately downstream of the previous instance</span> |
| <span class="sd"> of task X to finish successfully before it runs. This is useful if the</span> |
| <span class="sd"> different instances of a task X alter the same asset, and this asset</span> |
| <span class="sd"> is used by tasks downstream of task X. Note that depends_on_past</span> |
| <span class="sd"> is forced to True wherever wait_for_downstream is used.</span> |
| <span class="sd"> :type wait_for_downstream: bool</span> |
| <span class="sd"> :param queue: which queue to target when running this job. Not</span> |
| <span class="sd"> all executors implement queue management, the CeleryExecutor</span> |
| <span class="sd"> does support targeting specific queues.</span> |
| <span class="sd"> :type queue: str</span> |
| <span class="sd"> :param dag: a reference to the dag the task is attached to (if any)</span> |
| <span class="sd"> :type dag: airflow.models.DAG</span> |
| <span class="sd"> :param priority_weight: priority weight of this task against other task.</span> |
| <span class="sd"> This allows the executor to trigger higher priority tasks before</span> |
| <span class="sd"> others when things get backed up. Set priority_weight as a higher</span> |
| <span class="sd"> number for more important tasks.</span> |
| <span class="sd"> :type priority_weight: int</span> |
| <span class="sd"> :param weight_rule: weighting method used for the effective total</span> |
| <span class="sd"> priority weight of the task. Options are:</span> |
| <span class="sd"> ``{ downstream | upstream | absolute }`` default is ``downstream``</span> |
| <span class="sd"> When set to ``downstream`` the effective weight of the task is the</span> |
| <span class="sd"> aggregate sum of all downstream descendants. As a result, upstream</span> |
| <span class="sd"> tasks will have higher weight and will be scheduled more aggressively</span> |
| <span class="sd"> when using positive weight values. This is useful when you have</span> |
| <span class="sd"> multiple dag run instances and desire to have all upstream tasks to</span> |
| <span class="sd"> complete for all runs before each dag can continue processing</span> |
| <span class="sd"> downstream tasks. When set to ``upstream`` the effective weight is the</span> |
| <span class="sd"> aggregate sum of all upstream ancestors. This is the opposite where</span> |
| <span class="sd"> downtream tasks have higher weight and will be scheduled more</span> |
| <span class="sd"> aggressively when using positive weight values. This is useful when you</span> |
| <span class="sd"> have multiple dag run instances and prefer to have each dag complete</span> |
| <span class="sd"> before starting upstream tasks of other dags. When set to</span> |
| <span class="sd"> ``absolute``, the effective weight is the exact ``priority_weight``</span> |
| <span class="sd"> specified without additional weighting. You may want to do this when</span> |
| <span class="sd"> you know exactly what priority weight each task should have.</span> |
| <span class="sd"> Additionally, when set to ``absolute``, there is bonus effect of</span> |
| <span class="sd"> significantly speeding up the task creation process as for very large</span> |
| <span class="sd"> DAGS. Options can be set as string or using the constants defined in</span> |
| <span class="sd"> the static class ``airflow.utils.WeightRule``</span> |
| <span class="sd"> :type weight_rule: str</span> |
| <span class="sd"> :param pool: the slot pool this task should run in, slot pools are a</span> |
| <span class="sd"> way to limit concurrency for certain tasks</span> |
| <span class="sd"> :type pool: str</span> |
| <span class="sd"> :param sla: time by which the job is expected to succeed. Note that</span> |
| <span class="sd"> this represents the ``timedelta`` after the period is closed. For</span> |
| <span class="sd"> example if you set an SLA of 1 hour, the scheduler would send an email</span> |
| <span class="sd"> soon after 1:00AM on the ``2016-01-02`` if the ``2016-01-01`` instance</span> |
| <span class="sd"> has not succeeded yet.</span> |
| <span class="sd"> The scheduler pays special attention for jobs with an SLA and</span> |
| <span class="sd"> sends alert</span> |
| <span class="sd"> emails for sla misses. SLA misses are also recorded in the database</span> |
| <span class="sd"> for future reference. All tasks that share the same SLA time</span> |
| <span class="sd"> get bundled in a single email, sent soon after that time. SLA</span> |
| <span class="sd"> notification are sent once and only once for each task instance.</span> |
| <span class="sd"> :type sla: datetime.timedelta</span> |
| <span class="sd"> :param execution_timeout: max time allowed for the execution of</span> |
| <span class="sd"> this task instance, if it goes beyond it will raise and fail.</span> |
| <span class="sd"> :type execution_timeout: datetime.timedelta</span> |
| <span class="sd"> :param on_failure_callback: a function to be called when a task instance</span> |
| <span class="sd"> of this task fails. a context dictionary is passed as a single</span> |
| <span class="sd"> parameter to this function. Context contains references to related</span> |
| <span class="sd"> objects to the task instance and is documented under the macros</span> |
| <span class="sd"> section of the API.</span> |
| <span class="sd"> :type on_failure_callback: callable</span> |
| <span class="sd"> :param on_retry_callback: much like the ``on_failure_callback`` except</span> |
| <span class="sd"> that it is executed when retries occur.</span> |
| <span class="sd"> :type on_retry_callback: callable</span> |
| <span class="sd"> :param on_success_callback: much like the ``on_failure_callback`` except</span> |
| <span class="sd"> that it is executed when the task succeeds.</span> |
| <span class="sd"> :type on_success_callback: callable</span> |
| <span class="sd"> :param trigger_rule: defines the rule by which dependencies are applied</span> |
| <span class="sd"> for the task to get triggered. Options are:</span> |
| <span class="sd"> ``{ all_success | all_failed | all_done | one_success |</span> |
| <span class="sd"> one_failed | none_failed | none_skipped | dummy}``</span> |
| <span class="sd"> default is ``all_success``. Options can be set as string or</span> |
| <span class="sd"> using the constants defined in the static class</span> |
| <span class="sd"> ``airflow.utils.TriggerRule``</span> |
| <span class="sd"> :type trigger_rule: str</span> |
| <span class="sd"> :param resources: A map of resource parameter names (the argument names of the</span> |
| <span class="sd"> Resources constructor) to their values.</span> |
| <span class="sd"> :type resources: dict</span> |
| <span class="sd"> :param run_as_user: unix username to impersonate while running the task</span> |
| <span class="sd"> :type run_as_user: str</span> |
| <span class="sd"> :param task_concurrency: When set, a task will be able to limit the concurrent</span> |
| <span class="sd"> runs across execution_dates</span> |
| <span class="sd"> :type task_concurrency: int</span> |
| <span class="sd"> :param executor_config: Additional task-level configuration parameters that are</span> |
| <span class="sd"> interpreted by a specific executor. Parameters are namespaced by the name of</span> |
| <span class="sd"> executor.</span> |
| |
| <span class="sd"> **Example**: to run this task in a specific docker container through</span> |
| <span class="sd"> the KubernetesExecutor ::</span> |
| |
| <span class="sd"> MyOperator(...,</span> |
| <span class="sd"> executor_config={</span> |
| <span class="sd"> "KubernetesExecutor":</span> |
| <span class="sd"> {"image": "myCustomDockerImage"}</span> |
| <span class="sd"> }</span> |
| <span class="sd"> )</span> |
| |
| <span class="sd"> :type executor_config: dict</span> |
| <span class="sd"> :param do_xcom_push: if True, an XCom is pushed containing the Operator's</span> |
| <span class="sd"> result</span> |
| <span class="sd"> :type do_xcom_push: bool</span> |
| <span class="sd"> """</span> |
| |
| <span class="c1"># For derived classes to define which fields will get jinjaified</span> |
| <div class="viewcode-block" id="BaseOperator.template_fields"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.template_fields">[docs]</a> <span class="n">template_fields</span> <span class="o">=</span> <span class="p">[]</span> <span class="c1"># type: Iterable[str]</span></div> |
| <span class="c1"># Defines which files extensions to look for in the templated fields</span> |
| <div class="viewcode-block" id="BaseOperator.template_ext"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.template_ext">[docs]</a> <span class="n">template_ext</span> <span class="o">=</span> <span class="p">[]</span> <span class="c1"># type: Iterable[str]</span></div> |
| <span class="c1"># Defines the color in the UI</span> |
| <div class="viewcode-block" id="BaseOperator.ui_color"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.ui_color">[docs]</a> <span class="n">ui_color</span> <span class="o">=</span> <span class="s1">'#fff'</span></div> |
| <div class="viewcode-block" id="BaseOperator.ui_fgcolor"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.ui_fgcolor">[docs]</a> <span class="n">ui_fgcolor</span> <span class="o">=</span> <span class="s1">'#000'</span></div> |
| |
| <span class="c1"># base list which includes all the attrs that don't need deep copy.</span> |
| <div class="viewcode-block" id="BaseOperator._base_operator_shallow_copy_attrs"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator._base_operator_shallow_copy_attrs">[docs]</a> <span class="n">_base_operator_shallow_copy_attrs</span> <span class="o">=</span> <span class="p">(</span><span class="s1">'user_defined_macros'</span><span class="p">,</span> |
| <span class="s1">'user_defined_filters'</span><span class="p">,</span> |
| <span class="s1">'params'</span><span class="p">,</span> |
| <span class="s1">'_log'</span><span class="p">,)</span></div> |
| |
| <span class="c1"># each operator should override this class attr for shallow copy attrs.</span> |
| <div class="viewcode-block" id="BaseOperator.shallow_copy_attrs"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.shallow_copy_attrs">[docs]</a> <span class="n">shallow_copy_attrs</span> <span class="o">=</span> <span class="p">()</span> <span class="c1"># type: Iterable[str]</span></div> |
| |
| <span class="c1"># Defines the operator level extra links</span> |
| <div class="viewcode-block" id="BaseOperator.operator_extra_links"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.operator_extra_links">[docs]</a> <span class="n">operator_extra_links</span> <span class="o">=</span> <span class="p">()</span> <span class="c1"># type: Iterable[BaseOperatorLink]</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator._comps"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator._comps">[docs]</a> <span class="n">_comps</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="s1">'task_id'</span><span class="p">,</span> |
| <span class="s1">'dag_id'</span><span class="p">,</span> |
| <span class="s1">'owner'</span><span class="p">,</span> |
| <span class="s1">'email'</span><span class="p">,</span> |
| <span class="s1">'email_on_retry'</span><span class="p">,</span> |
| <span class="s1">'retry_delay'</span><span class="p">,</span> |
| <span class="s1">'retry_exponential_backoff'</span><span class="p">,</span> |
| <span class="s1">'max_retry_delay'</span><span class="p">,</span> |
| <span class="s1">'start_date'</span><span class="p">,</span> |
| <span class="s1">'schedule_interval'</span><span class="p">,</span> |
| <span class="s1">'depends_on_past'</span><span class="p">,</span> |
| <span class="s1">'wait_for_downstream'</span><span class="p">,</span> |
| <span class="s1">'priority_weight'</span><span class="p">,</span> |
| <span class="s1">'sla'</span><span class="p">,</span> |
| <span class="s1">'execution_timeout'</span><span class="p">,</span> |
| <span class="s1">'on_failure_callback'</span><span class="p">,</span> |
| <span class="s1">'on_success_callback'</span><span class="p">,</span> |
| <span class="s1">'on_retry_callback'</span><span class="p">,</span> |
| <span class="s1">'do_xcom_push'</span><span class="p">,</span></div> |
| <span class="p">}</span> |
| |
| <span class="nd">@apply_defaults</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">task_id</span><span class="p">,</span> <span class="c1"># type: str</span> |
| <span class="n">owner</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'operators'</span><span class="p">,</span> <span class="s1">'DEFAULT_OWNER'</span><span class="p">),</span> <span class="c1"># type: str</span> |
| <span class="n">email</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[str]</span> |
| <span class="n">email_on_retry</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="c1"># type: bool</span> |
| <span class="n">email_on_failure</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="c1"># type: bool</span> |
| <span class="n">retries</span><span class="o">=</span><span class="mi">0</span><span class="p">,</span> <span class="c1"># type: int</span> |
| <span class="n">retry_delay</span><span class="o">=</span><span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="mi">300</span><span class="p">),</span> <span class="c1"># type: timedelta</span> |
| <span class="n">retry_exponential_backoff</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="c1"># type: bool</span> |
| <span class="n">max_retry_delay</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[datetime]</span> |
| <span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[datetime]</span> |
| <span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[datetime]</span> |
| <span class="n">schedule_interval</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># not hooked as of now</span> |
| <span class="n">depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="c1"># type: bool</span> |
| <span class="n">wait_for_downstream</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="c1"># type: bool</span> |
| <span class="n">dag</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[DAG]</span> |
| <span class="n">params</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span> |
| <span class="n">default_args</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span> |
| <span class="n">priority_weight</span><span class="o">=</span><span class="mi">1</span><span class="p">,</span> <span class="c1"># type: int</span> |
| <span class="n">weight_rule</span><span class="o">=</span><span class="n">WeightRule</span><span class="o">.</span><span class="n">DOWNSTREAM</span><span class="p">,</span> <span class="c1"># type: str</span> |
| <span class="n">queue</span><span class="o">=</span><span class="n">configuration</span><span class="o">.</span><span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'celery'</span><span class="p">,</span> <span class="s1">'default_queue'</span><span class="p">),</span> <span class="c1"># type: str</span> |
| <span class="n">pool</span><span class="o">=</span><span class="n">Pool</span><span class="o">.</span><span class="n">DEFAULT_POOL_NAME</span><span class="p">,</span> <span class="c1"># type: str</span> |
| <span class="n">sla</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[timedelta]</span> |
| <span class="n">execution_timeout</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[timedelta]</span> |
| <span class="n">on_failure_callback</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Callable]</span> |
| <span class="n">on_success_callback</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Callable]</span> |
| <span class="n">on_retry_callback</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Callable]</span> |
| <span class="n">trigger_rule</span><span class="o">=</span><span class="n">TriggerRule</span><span class="o">.</span><span class="n">ALL_SUCCESS</span><span class="p">,</span> <span class="c1"># type: str</span> |
| <span class="n">resources</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span> |
| <span class="n">run_as_user</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[str]</span> |
| <span class="n">task_concurrency</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[int]</span> |
| <span class="n">executor_config</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span> |
| <span class="n">do_xcom_push</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="c1"># type: bool</span> |
| <span class="n">inlets</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span> |
| <span class="n">outlets</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="c1"># type: Optional[Dict]</span> |
| <span class="o">*</span><span class="n">args</span><span class="p">,</span> |
| <span class="o">**</span><span class="n">kwargs</span> |
| <span class="p">):</span> |
| |
| <span class="k">if</span> <span class="n">args</span> <span class="ow">or</span> <span class="n">kwargs</span><span class="p">:</span> |
| <span class="c1"># TODO remove *args and **kwargs in Airflow 2.0</span> |
| <span class="n">warnings</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span> |
| <span class="s1">'Invalid arguments were passed to </span><span class="si">{c}</span><span class="s1"> (task_id: </span><span class="si">{t}</span><span class="s1">). '</span> |
| <span class="s1">'Support for passing such arguments will be dropped in '</span> |
| <span class="s1">'Airflow 2.0. Invalid arguments were:'</span> |
| <span class="s1">'</span><span class="se">\n</span><span class="s1">*args: </span><span class="si">{a}</span><span class="se">\n</span><span class="s1">**kwargs: </span><span class="si">{k}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">c</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="n">a</span><span class="o">=</span><span class="n">args</span><span class="p">,</span> <span class="n">k</span><span class="o">=</span><span class="n">kwargs</span><span class="p">,</span> <span class="n">t</span><span class="o">=</span><span class="n">task_id</span><span class="p">),</span> |
| <span class="n">category</span><span class="o">=</span><span class="ne">PendingDeprecationWarning</span><span class="p">,</span> |
| <span class="n">stacklevel</span><span class="o">=</span><span class="mi">3</span> |
| <span class="p">)</span> |
| <span class="n">validate_key</span><span class="p">(</span><span class="n">task_id</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span> <span class="o">=</span> <span class="n">task_id</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">owner</span> <span class="o">=</span> <span class="n">owner</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">email</span> <span class="o">=</span> <span class="n">email</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">email_on_retry</span> <span class="o">=</span> <span class="n">email_on_retry</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">email_on_failure</span> <span class="o">=</span> <span class="n">email_on_failure</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">start_date</span> |
| <span class="k">if</span> <span class="n">start_date</span> <span class="ow">and</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">start_date</span><span class="p">,</span> <span class="n">datetime</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s2">"start_date for </span><span class="si">%s</span><span class="s2"> isn't datetime.datetime"</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">start_date</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">convert_to_utc</span><span class="p">(</span><span class="n">start_date</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">end_date</span> |
| <span class="k">if</span> <span class="n">end_date</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="o">=</span> <span class="n">timezone</span><span class="o">.</span><span class="n">convert_to_utc</span><span class="p">(</span><span class="n">end_date</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">TriggerRule</span><span class="o">.</span><span class="n">is_valid</span><span class="p">(</span><span class="n">trigger_rule</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span> |
| <span class="s2">"The trigger_rule must be one of </span><span class="si">{all_triggers}</span><span class="s2">,"</span> |
| <span class="s2">"'</span><span class="si">{d}</span><span class="s2">.</span><span class="si">{t}</span><span class="s2">'; received '</span><span class="si">{tr}</span><span class="s2">'."</span> |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">all_triggers</span><span class="o">=</span><span class="n">TriggerRule</span><span class="o">.</span><span class="n">all_triggers</span><span class="p">(),</span> |
| <span class="n">d</span><span class="o">=</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">if</span> <span class="n">dag</span> <span class="k">else</span> <span class="s2">""</span><span class="p">,</span> <span class="n">t</span><span class="o">=</span><span class="n">task_id</span><span class="p">,</span> <span class="n">tr</span><span class="o">=</span><span class="n">trigger_rule</span><span class="p">))</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">trigger_rule</span> <span class="o">=</span> <span class="n">trigger_rule</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">depends_on_past</span> <span class="o">=</span> <span class="n">depends_on_past</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">wait_for_downstream</span> <span class="o">=</span> <span class="n">wait_for_downstream</span> |
| <span class="k">if</span> <span class="n">wait_for_downstream</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">depends_on_past</span> <span class="o">=</span> <span class="kc">True</span> |
| |
| <span class="k">if</span> <span class="n">schedule_interval</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span> |
| <span class="s2">"schedule_interval is used for </span><span class="si">%s</span><span class="s2">, though it has "</span> |
| <span class="s2">"been deprecated as a task parameter, you need to "</span> |
| <span class="s2">"specify it as a DAG parameter instead"</span><span class="p">,</span> |
| <span class="bp">self</span> |
| <span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span> <span class="o">=</span> <span class="n">schedule_interval</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">retries</span> <span class="o">=</span> <span class="n">retries</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">queue</span> <span class="o">=</span> <span class="n">queue</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">pool</span> <span class="o">=</span> <span class="n">pool</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">sla</span> <span class="o">=</span> <span class="n">sla</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">execution_timeout</span> <span class="o">=</span> <span class="n">execution_timeout</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">on_failure_callback</span> <span class="o">=</span> <span class="n">on_failure_callback</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">on_success_callback</span> <span class="o">=</span> <span class="n">on_success_callback</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">on_retry_callback</span> <span class="o">=</span> <span class="n">on_retry_callback</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">retry_delay</span><span class="p">,</span> <span class="n">timedelta</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">retry_delay</span> <span class="o">=</span> <span class="n">retry_delay</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Retry_delay isn't timedelta object, assuming secs"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">retry_delay</span> <span class="o">=</span> <span class="n">timedelta</span><span class="p">(</span><span class="n">seconds</span><span class="o">=</span><span class="n">retry_delay</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">retry_exponential_backoff</span> <span class="o">=</span> <span class="n">retry_exponential_backoff</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">max_retry_delay</span> <span class="o">=</span> <span class="n">max_retry_delay</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">params</span> <span class="o">=</span> <span class="n">params</span> <span class="ow">or</span> <span class="p">{}</span> <span class="c1"># Available in templates!</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">priority_weight</span> <span class="o">=</span> <span class="n">priority_weight</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">WeightRule</span><span class="o">.</span><span class="n">is_valid</span><span class="p">(</span><span class="n">weight_rule</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span> |
| <span class="s2">"The weight_rule must be one of </span><span class="si">{all_weight_rules}</span><span class="s2">,"</span> |
| <span class="s2">"'</span><span class="si">{d}</span><span class="s2">.</span><span class="si">{t}</span><span class="s2">'; received '</span><span class="si">{tr}</span><span class="s2">'."</span> |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">all_weight_rules</span><span class="o">=</span><span class="n">WeightRule</span><span class="o">.</span><span class="n">all_weight_rules</span><span class="p">,</span> |
| <span class="n">d</span><span class="o">=</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span> <span class="k">if</span> <span class="n">dag</span> <span class="k">else</span> <span class="s2">""</span><span class="p">,</span> <span class="n">t</span><span class="o">=</span><span class="n">task_id</span><span class="p">,</span> <span class="n">tr</span><span class="o">=</span><span class="n">weight_rule</span><span class="p">))</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">weight_rule</span> <span class="o">=</span> <span class="n">weight_rule</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">resources</span> <span class="o">=</span> <span class="n">Resources</span><span class="p">(</span><span class="o">*</span><span class="n">resources</span><span class="p">)</span> <span class="k">if</span> <span class="n">resources</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span> <span class="k">else</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">run_as_user</span> <span class="o">=</span> <span class="n">run_as_user</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_concurrency</span> <span class="o">=</span> <span class="n">task_concurrency</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">executor_config</span> <span class="o">=</span> <span class="n">executor_config</span> <span class="ow">or</span> <span class="p">{}</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">do_xcom_push</span> <span class="o">=</span> <span class="n">do_xcom_push</span> |
| |
| <span class="c1"># Private attributes</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_upstream_task_ids</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span> <span class="c1"># type: Set[str]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_downstream_task_ids</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span> <span class="c1"># type: Set[str]</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">dag</span> <span class="ow">and</span> <span class="n">settings</span><span class="o">.</span><span class="n">CONTEXT_MANAGER_DAG</span><span class="p">:</span> |
| <span class="n">dag</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">CONTEXT_MANAGER_DAG</span> |
| <span class="k">if</span> <span class="n">dag</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">dag</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_log</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s2">"airflow.task.operators"</span><span class="p">)</span> |
| |
| <span class="c1"># lineage</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">inlets</span> <span class="o">=</span> <span class="p">[]</span> <span class="c1"># type: Iterable[DataSet]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">outlets</span> <span class="o">=</span> <span class="p">[]</span> <span class="c1"># type: Iterable[DataSet]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">lineage_data</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_inlets</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="s2">"auto"</span><span class="p">:</span> <span class="kc">False</span><span class="p">,</span> |
| <span class="s2">"task_ids"</span><span class="p">:</span> <span class="p">[],</span> |
| <span class="s2">"datasets"</span><span class="p">:</span> <span class="p">[],</span> |
| <span class="p">}</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_outlets</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="s2">"datasets"</span><span class="p">:</span> <span class="p">[],</span> |
| <span class="p">}</span> <span class="c1"># type: Dict</span> |
| |
| <span class="k">if</span> <span class="n">inlets</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_inlets</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">inlets</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">outlets</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_outlets</span><span class="o">.</span><span class="n">update</span><span class="p">(</span><span class="n">outlets</span><span class="p">)</span> |
| |
| <div class="viewcode-block" id="BaseOperator.__eq__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__eq__">[docs]</a> <span class="k">def</span> <span class="nf">__eq__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span> |
| <span class="k">if</span> <span class="p">(</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> <span class="o">==</span> <span class="nb">type</span><span class="p">(</span><span class="n">other</span><span class="p">)</span> <span class="ow">and</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="n">task_id</span><span class="p">):</span> |
| <span class="k">return</span> <span class="nb">all</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> <span class="o">==</span> <span class="n">other</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">c</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> <span class="k">for</span> <span class="n">c</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_comps</span><span class="p">)</span> |
| <span class="k">return</span> <span class="kc">False</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.__ne__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__ne__">[docs]</a> <span class="k">def</span> <span class="nf">__ne__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span> |
| <span class="k">return</span> <span class="ow">not</span> <span class="bp">self</span> <span class="o">==</span> <span class="n">other</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.__lt__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__lt__">[docs]</a> <span class="k">def</span> <span class="nf">__lt__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span> <span class="o"><</span> <span class="n">other</span><span class="o">.</span><span class="n">task_id</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.__hash__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__hash__">[docs]</a> <span class="k">def</span> <span class="nf">__hash__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">hash_components</span> <span class="o">=</span> <span class="p">[</span><span class="nb">type</span><span class="p">(</span><span class="bp">self</span><span class="p">)]</span> |
| <span class="k">for</span> <span class="n">c</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_comps</span><span class="p">:</span> |
| <span class="n">val</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">c</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="nb">hash</span><span class="p">(</span><span class="n">val</span><span class="p">)</span> |
| <span class="n">hash_components</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="n">val</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span> |
| <span class="n">hash_components</span><span class="o">.</span><span class="n">append</span><span class="p">(</span><span class="nb">repr</span><span class="p">(</span><span class="n">val</span><span class="p">))</span> |
| <span class="k">return</span> <span class="nb">hash</span><span class="p">(</span><span class="nb">tuple</span><span class="p">(</span><span class="n">hash_components</span><span class="p">))</span></div> |
| |
| <span class="c1"># Composing Operators -----------------------------------------------</span> |
| |
| <div class="viewcode-block" id="BaseOperator.__rshift__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__rshift__">[docs]</a> <span class="k">def</span> <span class="nf">__rshift__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Implements Self >> Other == self.set_downstream(other)</span> |
| |
| <span class="sd"> If "Other" is a DAG, the DAG is assigned to the Operator.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">other</span><span class="p">,</span> <span class="n">DAG</span><span class="p">):</span> |
| <span class="c1"># if this dag is already assigned, do nothing</span> |
| <span class="c1"># otherwise, do normal dag assignment</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">()</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="ow">is</span> <span class="n">other</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">other</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">set_downstream</span><span class="p">(</span><span class="n">other</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">other</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.__lshift__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__lshift__">[docs]</a> <span class="k">def</span> <span class="nf">__lshift__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Implements Self << Other == self.set_upstream(other)</span> |
| |
| <span class="sd"> If "Other" is a DAG, the DAG is assigned to the Operator.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">other</span><span class="p">,</span> <span class="n">DAG</span><span class="p">):</span> |
| <span class="c1"># if this dag is already assigned, do nothing</span> |
| <span class="c1"># otherwise, do normal dag assignment</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">()</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="ow">is</span> <span class="n">other</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">other</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">set_upstream</span><span class="p">(</span><span class="n">other</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">other</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.__rrshift__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__rrshift__">[docs]</a> <span class="k">def</span> <span class="nf">__rrshift__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Called for [DAG] >> [Operator] because DAGs don't have</span> |
| <span class="sd"> __rshift__ operators.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="fm">__lshift__</span><span class="p">(</span><span class="n">other</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.__rlshift__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__rlshift__">[docs]</a> <span class="k">def</span> <span class="nf">__rlshift__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">other</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Called for [DAG] << [Operator] because DAGs don't have</span> |
| <span class="sd"> __lshift__ operators.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="fm">__rshift__</span><span class="p">(</span><span class="n">other</span><span class="p">)</span> |
| <span class="k">return</span> <span class="bp">self</span></div> |
| |
| <span class="c1"># /Composing Operators ---------------------------------------------</span> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="BaseOperator.dag"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.dag">[docs]</a> <span class="k">def</span> <span class="nf">dag</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Returns the Operator's DAG if set, otherwise raises an error</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">():</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dag</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span> |
| <span class="s1">'Operator </span><span class="si">{}</span><span class="s1"> has not been assigned to a DAG yet'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span></div> |
| |
| <span class="nd">@dag</span><span class="o">.</span><span class="n">setter</span> |
| <span class="k">def</span> <span class="nf">dag</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dag</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Operators can be assigned to one DAG, one time. Repeat assignments to</span> |
| <span class="sd"> that same DAG are ok.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">dag</span><span class="p">,</span> <span class="n">DAG</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span> |
| <span class="s1">'Expected DAG; received </span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dag</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">))</span> |
| <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">()</span> <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="ow">is</span> <span class="ow">not</span> <span class="n">dag</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span> |
| <span class="s2">"The DAG assigned to </span><span class="si">{}</span><span class="s2"> can not be changed."</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="p">))</span> |
| <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">dag</span><span class="o">.</span><span class="n">task_dict</span><span class="p">:</span> |
| <span class="n">dag</span><span class="o">.</span><span class="n">add_task</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">_dag</span> <span class="o">=</span> <span class="n">dag</span> |
| |
| <div class="viewcode-block" id="BaseOperator.has_dag"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.has_dag">[docs]</a> <span class="k">def</span> <span class="nf">has_dag</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Returns True if the Operator has been assigned to a DAG.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'_dag'</span><span class="p">,</span> <span class="kc">None</span><span class="p">)</span> <span class="ow">is</span> <span class="ow">not</span> <span class="kc">None</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="BaseOperator.dag_id"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.dag_id">[docs]</a> <span class="k">def</span> <span class="nf">dag_id</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">():</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">dag_id</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="s1">'adhoc_'</span> <span class="o">+</span> <span class="bp">self</span><span class="o">.</span><span class="n">owner</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="BaseOperator.deps"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.deps">[docs]</a> <span class="k">def</span> <span class="nf">deps</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Returns the list of dependencies for the operator. These differ from execution</span> |
| <span class="sd"> context dependencies in that they are specific to tasks and can be</span> |
| <span class="sd"> extended/overridden by subclasses.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="n">NotInRetryPeriodDep</span><span class="p">(),</span> |
| <span class="n">PrevDagrunDep</span><span class="p">(),</span> |
| <span class="n">TriggerRuleDep</span><span class="p">(),</span></div> |
| <span class="p">}</span> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="BaseOperator.schedule_interval"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.schedule_interval">[docs]</a> <span class="k">def</span> <span class="nf">schedule_interval</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> The schedule interval of the DAG always wins over individual tasks so</span> |
| <span class="sd"> that tasks within a DAG always line up. The task still needs a</span> |
| <span class="sd"> schedule_interval as it may not be attached to a DAG.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">():</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">_schedule_interval</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_schedule_interval</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="BaseOperator.priority_weight_total"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.priority_weight_total">[docs]</a> <span class="k">def</span> <span class="nf">priority_weight_total</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">weight_rule</span> <span class="o">==</span> <span class="n">WeightRule</span><span class="o">.</span><span class="n">ABSOLUTE</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">priority_weight</span> |
| <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">weight_rule</span> <span class="o">==</span> <span class="n">WeightRule</span><span class="o">.</span><span class="n">DOWNSTREAM</span><span class="p">:</span> |
| <span class="n">upstream</span> <span class="o">=</span> <span class="kc">False</span> |
| <span class="k">elif</span> <span class="bp">self</span><span class="o">.</span><span class="n">weight_rule</span> <span class="o">==</span> <span class="n">WeightRule</span><span class="o">.</span><span class="n">UPSTREAM</span><span class="p">:</span> |
| <span class="n">upstream</span> <span class="o">=</span> <span class="kc">True</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">upstream</span> <span class="o">=</span> <span class="kc">False</span> |
| |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">priority_weight</span> <span class="o">+</span> <span class="nb">sum</span><span class="p">(</span> |
| <span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">task_id</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dag</span><span class="o">.</span><span class="n">task_dict</span><span class="p">[</span><span class="n">task_id</span><span class="p">]</span><span class="o">.</span><span class="n">priority_weight</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">get_flat_relative_ids</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="n">upstream</span><span class="p">))</span></div> |
| <span class="p">)</span> |
| |
| <span class="nd">@cached_property</span> |
| <div class="viewcode-block" id="BaseOperator.operator_extra_link_dict"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.operator_extra_link_dict">[docs]</a> <span class="k">def</span> <span class="nf">operator_extra_link_dict</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span><span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">:</span> <span class="n">link</span> <span class="k">for</span> <span class="n">link</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">operator_extra_links</span><span class="p">}</span></div> |
| |
| <span class="nd">@cached_property</span> |
| <div class="viewcode-block" id="BaseOperator.global_operator_extra_link_dict"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.global_operator_extra_link_dict">[docs]</a> <span class="k">def</span> <span class="nf">global_operator_extra_link_dict</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="kn">from</span> <span class="nn">airflow.plugins_manager</span> <span class="k">import</span> <span class="n">global_operator_extra_links</span> |
| <span class="k">return</span> <span class="p">{</span><span class="n">link</span><span class="o">.</span><span class="n">name</span><span class="p">:</span> <span class="n">link</span> <span class="k">for</span> <span class="n">link</span> <span class="ow">in</span> <span class="n">global_operator_extra_links</span><span class="p">}</span></div> |
| |
| <span class="nd">@prepare_lineage</span> |
| <div class="viewcode-block" id="BaseOperator.pre_execute"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.pre_execute">[docs]</a> <span class="k">def</span> <span class="nf">pre_execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> This hook is triggered right before self.execute() is called.</span> |
| <span class="sd"> """</span> |
| <span class="k">pass</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.execute"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.execute">[docs]</a> <span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> This is the main method to derive when creating an operator.</span> |
| <span class="sd"> Context is the same dictionary used as when rendering jinja templates.</span> |
| |
| <span class="sd"> Refer to get_template_context for more context.</span> |
| <span class="sd"> """</span> |
| <span class="k">raise</span> <span class="ne">NotImplementedError</span><span class="p">()</span></div> |
| |
| <span class="nd">@apply_lineage</span> |
| <div class="viewcode-block" id="BaseOperator.post_execute"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.post_execute">[docs]</a> <span class="k">def</span> <span class="nf">post_execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">result</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> This hook is triggered right after self.execute() is called.</span> |
| <span class="sd"> It is passed the execution context and any results returned by the</span> |
| <span class="sd"> operator.</span> |
| <span class="sd"> """</span> |
| <span class="k">pass</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.on_kill"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.on_kill">[docs]</a> <span class="k">def</span> <span class="nf">on_kill</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Override this method to cleanup subprocesses when a task instance</span> |
| <span class="sd"> gets killed. Any use of the threading, subprocess or multiprocessing</span> |
| <span class="sd"> module within an operator needs to be cleaned up or it will leave</span> |
| <span class="sd"> ghost processes behind.</span> |
| <span class="sd"> """</span> |
| <span class="k">pass</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.__deepcopy__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__deepcopy__">[docs]</a> <span class="k">def</span> <span class="nf">__deepcopy__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">memo</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Hack sorting double chained task lists by task_id to avoid hitting</span> |
| <span class="sd"> max_depth on deepcopy operations.</span> |
| <span class="sd"> """</span> |
| <span class="n">sys</span><span class="o">.</span><span class="n">setrecursionlimit</span><span class="p">(</span><span class="mi">5000</span><span class="p">)</span> <span class="c1"># TODO fix this in a better way</span> |
| <span class="bp">cls</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="bp">cls</span><span class="o">.</span><span class="fm">__new__</span><span class="p">(</span><span class="bp">cls</span><span class="p">)</span> |
| <span class="n">memo</span><span class="p">[</span><span class="nb">id</span><span class="p">(</span><span class="bp">self</span><span class="p">)]</span> <span class="o">=</span> <span class="n">result</span> |
| |
| <span class="n">shallow_copy</span> <span class="o">=</span> <span class="bp">cls</span><span class="o">.</span><span class="n">shallow_copy_attrs</span> <span class="o">+</span> <span class="bp">cls</span><span class="o">.</span><span class="n">_base_operator_shallow_copy_attrs</span> |
| |
| <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="o">.</span><span class="n">items</span><span class="p">()):</span> |
| <span class="k">if</span> <span class="n">k</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">shallow_copy</span><span class="p">:</span> |
| <span class="nb">setattr</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="n">k</span><span class="p">,</span> <span class="n">copy</span><span class="o">.</span><span class="n">deepcopy</span><span class="p">(</span><span class="n">v</span><span class="p">,</span> <span class="n">memo</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="nb">setattr</span><span class="p">(</span><span class="n">result</span><span class="p">,</span> <span class="n">k</span><span class="p">,</span> <span class="n">copy</span><span class="o">.</span><span class="n">copy</span><span class="p">(</span><span class="n">v</span><span class="p">))</span> |
| <span class="k">return</span> <span class="n">result</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.__getstate__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__getstate__">[docs]</a> <span class="k">def</span> <span class="nf">__getstate__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">state</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span><span class="p">)</span> |
| <span class="k">del</span> <span class="n">state</span><span class="p">[</span><span class="s1">'_log'</span><span class="p">]</span> |
| |
| <span class="k">return</span> <span class="n">state</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.__setstate__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__setstate__">[docs]</a> <span class="k">def</span> <span class="nf">__setstate__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">state</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="vm">__dict__</span> <span class="o">=</span> <span class="n">state</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_log</span> <span class="o">=</span> <span class="n">logging</span><span class="o">.</span><span class="n">getLogger</span><span class="p">(</span><span class="s2">"airflow.task.operators"</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.render_template_from_field"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.render_template_from_field">[docs]</a> <span class="k">def</span> <span class="nf">render_template_from_field</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">attr</span><span class="p">,</span> <span class="n">content</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">jinja_env</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Renders a template from a field. If the field is a string, it will</span> |
| <span class="sd"> simply render the string and return the result. If it is a collection or</span> |
| <span class="sd"> nested set of collections, it will traverse the structure and render</span> |
| <span class="sd"> all elements in it. If the field has another type, it will return it as it is.</span> |
| <span class="sd"> """</span> |
| <span class="n">rt</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">render_template</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">):</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="n">jinja_env</span><span class="o">.</span><span class="n">from_string</span><span class="p">(</span><span class="n">content</span><span class="p">)</span><span class="o">.</span><span class="n">render</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="nb">tuple</span><span class="p">):</span> |
| <span class="k">if</span> <span class="nb">type</span><span class="p">(</span><span class="n">content</span><span class="p">)</span> <span class="ow">is</span> <span class="ow">not</span> <span class="nb">tuple</span><span class="p">:</span> |
| <span class="c1"># Special case for named tuples</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="n">content</span><span class="o">.</span><span class="vm">__class__</span><span class="p">(</span><span class="o">*</span><span class="p">(</span><span class="n">rt</span><span class="p">(</span><span class="n">attr</span><span class="p">,</span> <span class="n">e</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">content</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="nb">tuple</span><span class="p">(</span><span class="n">rt</span><span class="p">(</span><span class="n">attr</span><span class="p">,</span> <span class="n">e</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">content</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="p">[</span><span class="n">rt</span><span class="p">(</span><span class="n">attr</span><span class="p">,</span> <span class="n">e</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span> <span class="k">for</span> <span class="n">e</span> <span class="ow">in</span> <span class="n">content</span><span class="p">]</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="p">{</span> |
| <span class="n">k</span><span class="p">:</span> <span class="n">rt</span><span class="p">(</span><span class="s2">"</span><span class="si">{}</span><span class="s2">[</span><span class="si">{}</span><span class="s2">]"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">attr</span><span class="p">,</span> <span class="n">k</span><span class="p">),</span> <span class="n">v</span><span class="p">,</span> <span class="n">context</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">k</span><span class="p">,</span> <span class="n">v</span> <span class="ow">in</span> <span class="nb">list</span><span class="p">(</span><span class="n">content</span><span class="o">.</span><span class="n">items</span><span class="p">())}</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">result</span> <span class="o">=</span> <span class="n">content</span> |
| <span class="k">return</span> <span class="n">result</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.render_template"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.render_template">[docs]</a> <span class="k">def</span> <span class="nf">render_template</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">attr</span><span class="p">,</span> <span class="n">content</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Renders a template either from a file or directly in a field, and returns</span> |
| <span class="sd"> the rendered result.</span> |
| <span class="sd"> """</span> |
| <span class="n">jinja_env</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_template_env</span><span class="p">()</span> |
| |
| <span class="n">exts</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="n">template_ext</span> |
| <span class="k">if</span> <span class="p">(</span> |
| <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">)</span> <span class="ow">and</span> |
| <span class="nb">any</span><span class="p">([</span><span class="n">content</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="n">ext</span><span class="p">)</span> <span class="k">for</span> <span class="n">ext</span> <span class="ow">in</span> <span class="n">exts</span><span class="p">])):</span> |
| <span class="k">return</span> <span class="n">jinja_env</span><span class="o">.</span><span class="n">get_template</span><span class="p">(</span><span class="n">content</span><span class="p">)</span><span class="o">.</span><span class="n">render</span><span class="p">(</span><span class="o">**</span><span class="n">context</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">render_template_from_field</span><span class="p">(</span><span class="n">attr</span><span class="p">,</span> <span class="n">content</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">jinja_env</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.get_template_env"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_template_env">[docs]</a> <span class="k">def</span> <span class="nf">get_template_env</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">get_template_env</span><span class="p">()</span> \ |
| <span class="k">if</span> <span class="nb">hasattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="s1">'dag'</span><span class="p">)</span> \ |
| <span class="k">else</span> <span class="n">jinja2</span><span class="o">.</span><span class="n">Environment</span><span class="p">(</span><span class="n">cache_size</span><span class="o">=</span><span class="mi">0</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.prepare_template"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.prepare_template">[docs]</a> <span class="k">def</span> <span class="nf">prepare_template</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Hook that is triggered after the templated fields get replaced</span> |
| <span class="sd"> by their content. If you need your operator to alter the</span> |
| <span class="sd"> content of the file before the template is rendered,</span> |
| <span class="sd"> it should override this method to do so.</span> |
| <span class="sd"> """</span> |
| <span class="k">pass</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.resolve_template_files"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.resolve_template_files">[docs]</a> <span class="k">def</span> <span class="nf">resolve_template_files</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># Getting the content of files for template_field / template_ext</span> |
| <span class="k">for</span> <span class="n">attr</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">template_fields</span><span class="p">:</span> |
| <span class="n">content</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">attr</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">content</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">continue</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">)</span> <span class="ow">and</span> \ |
| <span class="nb">any</span><span class="p">([</span><span class="n">content</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="n">ext</span><span class="p">)</span> <span class="k">for</span> <span class="n">ext</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">template_ext</span><span class="p">]):</span> |
| <span class="n">env</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_template_env</span><span class="p">()</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="nb">setattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">attr</span><span class="p">,</span> <span class="n">env</span><span class="o">.</span><span class="n">loader</span><span class="o">.</span><span class="n">get_source</span><span class="p">(</span><span class="n">env</span><span class="p">,</span> <span class="n">content</span><span class="p">)[</span><span class="mi">0</span><span class="p">])</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="nb">list</span><span class="p">):</span> |
| <span class="n">env</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">get_template_env</span><span class="p">()</span> |
| <span class="k">for</span> <span class="n">i</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="nb">len</span><span class="p">(</span><span class="n">content</span><span class="p">)):</span> |
| <span class="k">if</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">[</span><span class="n">i</span><span class="p">],</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">)</span> <span class="ow">and</span> \ |
| <span class="nb">any</span><span class="p">([</span><span class="n">content</span><span class="p">[</span><span class="n">i</span><span class="p">]</span><span class="o">.</span><span class="n">endswith</span><span class="p">(</span><span class="n">ext</span><span class="p">)</span> <span class="k">for</span> <span class="n">ext</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">template_ext</span><span class="p">]):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">content</span><span class="p">[</span><span class="n">i</span><span class="p">]</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">loader</span><span class="o">.</span><span class="n">get_source</span><span class="p">(</span><span class="n">env</span><span class="p">,</span> <span class="n">content</span><span class="p">[</span><span class="n">i</span><span class="p">])[</span><span class="mi">0</span><span class="p">]</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="n">e</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">prepare_template</span><span class="p">()</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="BaseOperator.upstream_list"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.upstream_list">[docs]</a> <span class="k">def</span> <span class="nf">upstream_list</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""@property: list of tasks directly upstream"""</span> |
| <span class="k">return</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">tid</span><span class="p">)</span> <span class="k">for</span> <span class="n">tid</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_upstream_task_ids</span><span class="p">]</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="BaseOperator.upstream_task_ids"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.upstream_task_ids">[docs]</a> <span class="k">def</span> <span class="nf">upstream_task_ids</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_upstream_task_ids</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="BaseOperator.downstream_list"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.downstream_list">[docs]</a> <span class="k">def</span> <span class="nf">downstream_list</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""@property: list of tasks directly downstream"""</span> |
| <span class="k">return</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">get_task</span><span class="p">(</span><span class="n">tid</span><span class="p">)</span> <span class="k">for</span> <span class="n">tid</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">_downstream_task_ids</span><span class="p">]</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="BaseOperator.downstream_task_ids"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.downstream_task_ids">[docs]</a> <span class="k">def</span> <span class="nf">downstream_task_ids</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_downstream_task_ids</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="BaseOperator.clear"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.clear">[docs]</a> <span class="k">def</span> <span class="nf">clear</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> |
| <span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">downstream</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Clears the state of task instances associated with the task, following</span> |
| <span class="sd"> the parameters specified.</span> |
| <span class="sd"> """</span> |
| <span class="n">TI</span> <span class="o">=</span> <span class="n">TaskInstance</span> |
| <span class="n">qry</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TI</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="n">start_date</span><span class="p">:</span> |
| <span class="n">qry</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">>=</span> <span class="n">start_date</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">end_date</span><span class="p">:</span> |
| <span class="n">qry</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">execution_date</span> <span class="o"><=</span> <span class="n">end_date</span><span class="p">)</span> |
| |
| <span class="n">tasks</span> <span class="o">=</span> <span class="p">[</span><span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">]</span> |
| |
| <span class="k">if</span> <span class="n">upstream</span><span class="p">:</span> |
| <span class="n">tasks</span> <span class="o">+=</span> <span class="p">[</span> |
| <span class="n">t</span><span class="o">.</span><span class="n">task_id</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_flat_relatives</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">True</span><span class="p">)]</span> |
| |
| <span class="k">if</span> <span class="n">downstream</span><span class="p">:</span> |
| <span class="n">tasks</span> <span class="o">+=</span> <span class="p">[</span> |
| <span class="n">t</span><span class="o">.</span><span class="n">task_id</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_flat_relatives</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">)]</span> |
| |
| <span class="n">qry</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TI</span><span class="o">.</span><span class="n">task_id</span><span class="o">.</span><span class="n">in_</span><span class="p">(</span><span class="n">tasks</span><span class="p">))</span> |
| |
| <span class="n">count</span> <span class="o">=</span> <span class="n">qry</span><span class="o">.</span><span class="n">count</span><span class="p">()</span> |
| |
| <span class="n">clear_task_instances</span><span class="p">(</span><span class="n">qry</span><span class="o">.</span><span class="n">all</span><span class="p">(),</span> <span class="n">session</span><span class="p">,</span> <span class="n">dag</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="p">)</span> |
| |
| <span class="n">session</span><span class="o">.</span><span class="n">commit</span><span class="p">()</span> |
| |
| <span class="k">return</span> <span class="n">count</span></div> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="BaseOperator.get_task_instances"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_task_instances">[docs]</a> <span class="k">def</span> <span class="nf">get_task_instances</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Get a set of task instance related to this task for a specific date</span> |
| <span class="sd"> range.</span> |
| <span class="sd"> """</span> |
| <span class="n">end_date</span> <span class="o">=</span> <span class="n">end_date</span> <span class="ow">or</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> |
| <span class="k">return</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TaskInstance</span><span class="p">)</span>\ |
| <span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span>\ |
| <span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span>\ |
| <span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">>=</span> <span class="n">start_date</span><span class="p">)</span>\ |
| <span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span> <span class="o"><=</span> <span class="n">end_date</span><span class="p">)</span>\ |
| <span class="o">.</span><span class="n">order_by</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span><span class="p">)</span>\</div> |
| <span class="o">.</span><span class="n">all</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="BaseOperator.get_flat_relative_ids"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_flat_relative_ids">[docs]</a> <span class="k">def</span> <span class="nf">get_flat_relative_ids</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> <span class="n">found_descendants</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Get a flat list of relatives' ids, either upstream or downstream.</span> |
| <span class="sd"> """</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="n">found_descendants</span><span class="p">:</span> |
| <span class="n">found_descendants</span> <span class="o">=</span> <span class="nb">set</span><span class="p">()</span> |
| <span class="n">relative_ids</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">get_direct_relative_ids</span><span class="p">(</span><span class="n">upstream</span><span class="p">)</span> |
| |
| <span class="k">for</span> <span class="n">relative_id</span> <span class="ow">in</span> <span class="n">relative_ids</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">relative_id</span> <span class="ow">not</span> <span class="ow">in</span> <span class="n">found_descendants</span><span class="p">:</span> |
| <span class="n">found_descendants</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">relative_id</span><span class="p">)</span> |
| <span class="n">relative_task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dag</span><span class="o">.</span><span class="n">task_dict</span><span class="p">[</span><span class="n">relative_id</span><span class="p">]</span> |
| <span class="n">relative_task</span><span class="o">.</span><span class="n">get_flat_relative_ids</span><span class="p">(</span><span class="n">upstream</span><span class="p">,</span> |
| <span class="n">found_descendants</span><span class="p">)</span> |
| |
| <span class="k">return</span> <span class="n">found_descendants</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.get_flat_relatives"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_flat_relatives">[docs]</a> <span class="k">def</span> <span class="nf">get_flat_relatives</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Get a flat list of relatives, either upstream or downstream.</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="nb">list</span><span class="p">(</span><span class="nb">map</span><span class="p">(</span><span class="k">lambda</span> <span class="n">task_id</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">_dag</span><span class="o">.</span><span class="n">task_dict</span><span class="p">[</span><span class="n">task_id</span><span class="p">],</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">get_flat_relative_ids</span><span class="p">(</span><span class="n">upstream</span><span class="p">)))</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.run"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.run">[docs]</a> <span class="k">def</span> <span class="nf">run</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">start_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">end_date</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">ignore_first_depends_on_past</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">ignore_ti_state</span><span class="o">=</span><span class="kc">False</span><span class="p">,</span> |
| <span class="n">mark_success</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Run a set of task instances for a date range.</span> |
| <span class="sd"> """</span> |
| <span class="n">start_date</span> <span class="o">=</span> <span class="n">start_date</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">start_date</span> |
| <span class="n">end_date</span> <span class="o">=</span> <span class="n">end_date</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">end_date</span> <span class="ow">or</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">()</span> |
| |
| <span class="k">for</span> <span class="n">dt</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">dag</span><span class="o">.</span><span class="n">date_range</span><span class="p">(</span><span class="n">start_date</span><span class="p">,</span> <span class="n">end_date</span><span class="o">=</span><span class="n">end_date</span><span class="p">):</span> |
| <span class="n">TaskInstance</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dt</span><span class="p">)</span><span class="o">.</span><span class="n">run</span><span class="p">(</span> |
| <span class="n">mark_success</span><span class="o">=</span><span class="n">mark_success</span><span class="p">,</span> |
| <span class="n">ignore_depends_on_past</span><span class="o">=</span><span class="p">(</span> |
| <span class="n">dt</span> <span class="o">==</span> <span class="n">start_date</span> <span class="ow">and</span> <span class="n">ignore_first_depends_on_past</span><span class="p">),</span> |
| <span class="n">ignore_ti_state</span><span class="o">=</span><span class="n">ignore_ti_state</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.dry_run"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.dry_run">[docs]</a> <span class="k">def</span> <span class="nf">dry_run</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Dry run'</span><span class="p">)</span> |
| <span class="k">for</span> <span class="n">attr</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">template_fields</span><span class="p">:</span> |
| <span class="n">content</span> <span class="o">=</span> <span class="nb">getattr</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">attr</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">content</span> <span class="ow">and</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">content</span><span class="p">,</span> <span class="n">six</span><span class="o">.</span><span class="n">string_types</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Rendering template for </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">attr</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="n">content</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.get_direct_relative_ids"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_direct_relative_ids">[docs]</a> <span class="k">def</span> <span class="nf">get_direct_relative_ids</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Get the direct relative ids to the current task, upstream or</span> |
| <span class="sd"> downstream.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">upstream</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_upstream_task_ids</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">_downstream_task_ids</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.get_direct_relatives"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_direct_relatives">[docs]</a> <span class="k">def</span> <span class="nf">get_direct_relatives</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Get the direct relatives to the current task, upstream or</span> |
| <span class="sd"> downstream.</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">upstream</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">upstream_list</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">downstream_list</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.__repr__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.__repr__">[docs]</a> <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="s2">"<Task(</span><span class="si">{self.__class__.__name__}</span><span class="s2">): </span><span class="si">{self.task_id}</span><span class="s2">>"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">=</span><span class="bp">self</span><span class="p">)</span></div> |
| |
| <span class="nd">@property</span> |
| <div class="viewcode-block" id="BaseOperator.task_type"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.task_type">[docs]</a> <span class="k">def</span> <span class="nf">task_type</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.add_only_new"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.add_only_new">[docs]</a> <span class="k">def</span> <span class="nf">add_only_new</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">item_set</span><span class="p">,</span> <span class="n">item</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">item</span> <span class="ow">in</span> <span class="n">item_set</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span> |
| <span class="s1">'Dependency </span><span class="si">{self}</span><span class="s1">, </span><span class="si">{item}</span><span class="s1"> already registered'</span> |
| <span class="s1">''</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="bp">self</span><span class="o">=</span><span class="bp">self</span><span class="p">,</span> <span class="n">item</span><span class="o">=</span><span class="n">item</span><span class="p">))</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="n">item_set</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">item</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator._set_relatives"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator._set_relatives">[docs]</a> <span class="k">def</span> <span class="nf">_set_relatives</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task_or_task_list</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">task_list</span> <span class="o">=</span> <span class="nb">list</span><span class="p">(</span><span class="n">task_or_task_list</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">TypeError</span><span class="p">:</span> |
| <span class="n">task_list</span> <span class="o">=</span> <span class="p">[</span><span class="n">task_or_task_list</span><span class="p">]</span> |
| |
| <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="n">task_list</span><span class="p">:</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">t</span><span class="p">,</span> <span class="n">BaseOperator</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span> |
| <span class="s2">"Relationships can only be set between "</span> |
| <span class="s2">"Operators; received </span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">t</span><span class="o">.</span><span class="vm">__class__</span><span class="o">.</span><span class="vm">__name__</span><span class="p">))</span> |
| |
| <span class="c1"># relationships can only be set if the tasks share a single DAG. Tasks</span> |
| <span class="c1"># without a DAG are assigned to that DAG.</span> |
| <span class="n">dags</span> <span class="o">=</span> <span class="p">{</span><span class="n">t</span><span class="o">.</span><span class="n">_dag</span><span class="o">.</span><span class="n">dag_id</span><span class="p">:</span> <span class="n">t</span><span class="o">.</span><span class="n">_dag</span> <span class="k">for</span> <span class="n">t</span> <span class="ow">in</span> <span class="p">[</span><span class="bp">self</span><span class="p">]</span> <span class="o">+</span> <span class="n">task_list</span> <span class="k">if</span> <span class="n">t</span><span class="o">.</span><span class="n">has_dag</span><span class="p">()}</span> |
| |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">dags</span><span class="p">)</span> <span class="o">></span> <span class="mi">1</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span> |
| <span class="s1">'Tried to set relationships between tasks in '</span> |
| <span class="s1">'more than one DAG: </span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">dags</span><span class="o">.</span><span class="n">values</span><span class="p">()))</span> |
| <span class="k">elif</span> <span class="nb">len</span><span class="p">(</span><span class="n">dags</span><span class="p">)</span> <span class="o">==</span> <span class="mi">1</span><span class="p">:</span> |
| <span class="n">dag</span> <span class="o">=</span> <span class="n">dags</span><span class="o">.</span><span class="n">popitem</span><span class="p">()[</span><span class="mi">1</span><span class="p">]</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span> |
| <span class="s2">"Tried to create relationships between tasks that don't have "</span> |
| <span class="s2">"DAGs yet. Set the DAG for at least one "</span> |
| <span class="s2">"task and try again: </span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">([</span><span class="bp">self</span><span class="p">]</span> <span class="o">+</span> <span class="n">task_list</span><span class="p">))</span> |
| |
| <span class="k">if</span> <span class="n">dag</span> <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">has_dag</span><span class="p">():</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">dag</span> |
| |
| <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">task_list</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">dag</span> <span class="ow">and</span> <span class="ow">not</span> <span class="n">task</span><span class="o">.</span><span class="n">has_dag</span><span class="p">():</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">dag</span> <span class="o">=</span> <span class="n">dag</span> |
| <span class="k">if</span> <span class="n">upstream</span><span class="p">:</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">add_only_new</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">get_direct_relative_ids</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">add_only_new</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_upstream_task_ids</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">add_only_new</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">_downstream_task_ids</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">add_only_new</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">get_direct_relative_ids</span><span class="p">(</span><span class="n">upstream</span><span class="o">=</span><span class="kc">True</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.set_downstream"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.set_downstream">[docs]</a> <span class="k">def</span> <span class="nf">set_downstream</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task_or_task_list</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Set a task or a task list to be directly downstream from the current</span> |
| <span class="sd"> task.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_set_relatives</span><span class="p">(</span><span class="n">task_or_task_list</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">False</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.set_upstream"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.set_upstream">[docs]</a> <span class="k">def</span> <span class="nf">set_upstream</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task_or_task_list</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Set a task or a task list to be directly upstream from the current</span> |
| <span class="sd"> task.</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_set_relatives</span><span class="p">(</span><span class="n">task_or_task_list</span><span class="p">,</span> <span class="n">upstream</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.xcom_push"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.xcom_push">[docs]</a> <span class="k">def</span> <span class="nf">xcom_push</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">context</span><span class="p">,</span> |
| <span class="n">key</span><span class="p">,</span> |
| <span class="n">value</span><span class="p">,</span> |
| <span class="n">execution_date</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> See TaskInstance.xcom_push()</span> |
| <span class="sd"> """</span> |
| <span class="n">context</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_push</span><span class="p">(</span> |
| <span class="n">key</span><span class="o">=</span><span class="n">key</span><span class="p">,</span> |
| <span class="n">value</span><span class="o">=</span><span class="n">value</span><span class="p">,</span> |
| <span class="n">execution_date</span><span class="o">=</span><span class="n">execution_date</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.xcom_pull"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.xcom_pull">[docs]</a> <span class="k">def</span> <span class="nf">xcom_pull</span><span class="p">(</span> |
| <span class="bp">self</span><span class="p">,</span> |
| <span class="n">context</span><span class="p">,</span> |
| <span class="n">task_ids</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">dag_id</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">key</span><span class="o">=</span><span class="n">XCOM_RETURN_KEY</span><span class="p">,</span> |
| <span class="n">include_prior_dates</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> See TaskInstance.xcom_pull()</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="n">context</span><span class="p">[</span><span class="s1">'ti'</span><span class="p">]</span><span class="o">.</span><span class="n">xcom_pull</span><span class="p">(</span> |
| <span class="n">key</span><span class="o">=</span><span class="n">key</span><span class="p">,</span> |
| <span class="n">task_ids</span><span class="o">=</span><span class="n">task_ids</span><span class="p">,</span> |
| <span class="n">dag_id</span><span class="o">=</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">include_prior_dates</span><span class="o">=</span><span class="n">include_prior_dates</span><span class="p">)</span></div> |
| |
| <span class="nd">@cached_property</span> |
| <div class="viewcode-block" id="BaseOperator.extra_links"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.extra_links">[docs]</a> <span class="k">def</span> <span class="nf">extra_links</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> Iterable[str]</span> |
| <span class="k">return</span> <span class="nb">list</span><span class="p">(</span><span class="nb">set</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">operator_extra_link_dict</span><span class="o">.</span><span class="n">keys</span><span class="p">())</span> |
| <span class="o">.</span><span class="n">union</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">global_operator_extra_link_dict</span><span class="o">.</span><span class="n">keys</span><span class="p">()))</span></div> |
| |
| <div class="viewcode-block" id="BaseOperator.get_extra_links"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator.get_extra_links">[docs]</a> <span class="k">def</span> <span class="nf">get_extra_links</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dttm</span><span class="p">,</span> <span class="n">link_name</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> For an operator, gets the URL that the external links specified in</span> |
| <span class="sd"> `extra_links` should point to.</span> |
| <span class="sd"> :raise ValueError: The error message of a ValueError will be passed on through to</span> |
| <span class="sd"> the fronted to show up as a tooltip on the disabled link</span> |
| <span class="sd"> :param dttm: The datetime parsed execution date for the URL being searched for</span> |
| <span class="sd"> :param link_name: The name of the link we're looking for the URL for. Should be</span> |
| <span class="sd"> one of the options specified in `extra_links`</span> |
| <span class="sd"> :return: A URL</span> |
| <span class="sd"> """</span> |
| <span class="k">if</span> <span class="n">link_name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">operator_extra_link_dict</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">operator_extra_link_dict</span><span class="p">[</span><span class="n">link_name</span><span class="p">]</span><span class="o">.</span><span class="n">get_link</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dttm</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">link_name</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">global_operator_extra_link_dict</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">global_operator_extra_link_dict</span><span class="p">[</span><span class="n">link_name</span><span class="p">]</span><span class="o">.</span><span class="n">get_link</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">dttm</span><span class="p">)</span></div></div> |
| |
| |
| <div class="viewcode-block" id="BaseOperatorLink"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperatorLink">[docs]</a><span class="k">class</span> <span class="nc">BaseOperatorLink</span><span class="p">:</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Abstract base class that defines how we get an operator link.</span> |
| <span class="sd"> """</span> |
| |
| <div class="viewcode-block" id="BaseOperatorLink.__metaclass__"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperatorLink.__metaclass__">[docs]</a> <span class="n">__metaclass__</span> <span class="o">=</span> <span class="n">ABCMeta</span></div> |
| |
| <span class="nd">@property</span> |
| <span class="nd">@abstractmethod</span> |
| <div class="viewcode-block" id="BaseOperatorLink.name"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperatorLink.name">[docs]</a> <span class="k">def</span> <span class="nf">name</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># type: () -> str</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Name of the link. This will be the button name on the task UI.</span> |
| |
| <span class="sd"> :return: link name</span> |
| <span class="sd"> """</span> |
| <span class="k">pass</span></div> |
| |
| <span class="nd">@abstractmethod</span> |
| <div class="viewcode-block" id="BaseOperatorLink.get_link"><a class="viewcode-back" href="../../../_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperatorLink.get_link">[docs]</a> <span class="k">def</span> <span class="nf">get_link</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">operator</span><span class="p">,</span> <span class="n">dttm</span><span class="p">):</span> |
| <span class="c1"># type: (BaseOperator, datetime) -> str</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Link to external system.</span> |
| |
| <span class="sd"> :param operator: airflow operator</span> |
| <span class="sd"> :param dttm: datetime</span> |
| <span class="sd"> :return: link to external system</span> |
| <span class="sd"> """</span> |
| <span class="k">pass</span></div></div> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| |
| |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| <div class="footer">This page uses <a href="https://analytics.google.com/"> |
| Google Analytics</a> to collect statistics. You can disable it by blocking |
| the JavaScript coming from www.google-analytics.com. Check our |
| <a href="../../../privacy_notice.html">Privacy Policy</a> |
| for more details. |
| <script type="text/javascript"> |
| (function() { |
| var ga = document.createElement('script'); |
| ga.src = ('https:' == document.location.protocol ? |
| 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js'; |
| ga.setAttribute('async', 'true'); |
| var nodes = document.documentElement.childNodes; |
| var i = -1; |
| var node; |
| do { |
| i++; |
| node = nodes[i] |
| } while(node.nodeType !== Node.ELEMENT_NODE); |
| node.appendChild(ga); |
| })(); |
| </script> |
| </div> |
| |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |