Deferrable Operators & Triggers — Airflow Documentation
<div class="toctree" role="navigation" aria-label="main navigation">
<p class="caption" role="heading"><span class="caption-text">Content</span></p>
<ul class="current">
<li class="toctree-l1"><a class="reference internal" href="../index.html">Home</a></li>
<li class="toctree-l1"><a class="reference internal" href="../project.html">Project</a></li>
<li class="toctree-l1"><a class="reference internal" href="../license.html">License</a></li>
<li class="toctree-l1"><a class="reference internal" href="../start/index.html">Quick Start</a></li>
<li class="toctree-l1"><a class="reference internal" href="../installation/index.html">Installation</a></li>
<li class="toctree-l1"><a class="reference internal" href="../upgrading-from-1-10/index.html">Upgrading from 1.10 to 2</a></li>
<li class="toctree-l1"><a class="reference internal" href="../tutorial.html">Tutorial</a></li>
<li class="toctree-l1"><a class="reference internal" href="../tutorial_taskflow_api.html">Tutorial on the TaskFlow API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../howto/index.html">How-to Guides</a></li>
<li class="toctree-l1"><a class="reference internal" href="../ui.html">UI / Screenshots</a></li>
<li class="toctree-l1 current"><a class="reference internal" href="index.html">Concepts</a><ul class="current">
<li class="toctree-l2"><a class="reference internal" href="overview.html">Architecture Overview</a></li>
<li class="toctree-l2"><a class="reference internal" href="dags.html">DAGs</a></li>
<li class="toctree-l2"><a class="reference internal" href="tasks.html">Tasks</a></li>
<li class="toctree-l2"><a class="reference internal" href="operators.html">Operators</a></li>
<li class="toctree-l2"><a class="reference internal" href="dynamic-task-mapping.html">Dynamic Task Mapping</a></li>
<li class="toctree-l2"><a class="reference internal" href="sensors.html">Sensors</a></li>
<li class="toctree-l2 current"><a class="current reference internal" href="#">Deferrable Operators &amp; Triggers</a><ul>
<li class="toctree-l3"><a class="reference internal" href="#using-deferrable-operators">Using Deferrable Operators</a></li>
<li class="toctree-l3"><a class="reference internal" href="#writing-deferrable-operators">Writing Deferrable Operators</a><ul>
<li class="toctree-l4"><a class="reference internal" href="#triggering-deferral">Triggering Deferral</a></li>
<li class="toctree-l4"><a class="reference internal" href="#writing-triggers">Writing Triggers</a></li>
<li class="toctree-l3"><a class="reference internal" href="#high-availability">High Availability</a></li>
<li class="toctree-l3"><a class="reference internal" href="#smart-sensors">Smart Sensors</a></li>
<li class="toctree-l2"><a class="reference internal" href="smart-sensors.html">Smart Sensors</a></li>
<li class="toctree-l2"><a class="reference internal" href="taskflow.html">TaskFlow</a></li>
<li class="toctree-l2"><a class="reference internal" href="../executor/index.html">Executor</a></li>
<li class="toctree-l2"><a class="reference internal" href="scheduler.html">Scheduler</a></li>
<li class="toctree-l2"><a class="reference internal" href="dagfile-processing.html">DAG File Processing</a></li>
<li class="toctree-l2"><a class="reference internal" href="pools.html">Pools</a></li>
<li class="toctree-l2"><a class="reference internal" href="timetable.html">Timetables</a></li>
<li class="toctree-l2"><a class="reference internal" href="priority-weight.html">Priority Weights</a></li>
<li class="toctree-l2"><a class="reference internal" href="cluster-policies.html">Cluster Policies</a></li>
<li class="toctree-l2"><a class="reference internal" href="xcoms.html">XComs</a></li>
<li class="toctree-l2"><a class="reference internal" href="variables.html">Variables</a></li>
<li class="toctree-l2"><a class="reference internal" href="connections.html">Connections &amp; Hooks</a></li>
<li class="toctree-l2"><a class="reference internal" href="params.html">Params</a></li>
<li class="toctree-l1"><a class="reference internal" href="../executor/index.html">Executor</a></li>
<li class="toctree-l1"><a class="reference internal" href="../dag-run.html">DAG Runs</a></li>
<li class="toctree-l1"><a class="reference internal" href="../plugins.html">Plugins</a></li>
<li class="toctree-l1"><a class="reference internal" href="../security/index.html">Security</a></li>
<li class="toctree-l1"><a class="reference internal" href="../logging-monitoring/index.html">Logging &amp; Monitoring</a></li>
<li class="toctree-l1"><a class="reference internal" href="../timezone.html">Time Zones</a></li>
<li class="toctree-l1"><a class="reference internal" href="../usage-cli.html">Using the CLI</a></li>
<li class="toctree-l1"><a class="reference internal" href="../integration.html">Integration</a></li>
<li class="toctree-l1"><a class="reference internal" href="../kubernetes.html">Kubernetes</a></li>
<li class="toctree-l1"><a class="reference internal" href="../lineage.html">Lineage</a></li>
<li class="toctree-l1"><a class="reference internal" href="../listeners.html">Listeners</a></li>
<li class="toctree-l1"><a class="reference internal" href="../dag-serialization.html">DAG Serialization</a></li>
<li class="toctree-l1"><a class="reference internal" href="../modules_management.html">Modules Management</a></li>
<li class="toctree-l1"><a class="reference internal" href="../release-process.html">Release Policies</a></li>
<li class="toctree-l1"><a class="reference internal" href="../release_notes.html">Release Notes</a></li>
<li class="toctree-l1"><a class="reference internal" href="../best-practices.html">Best Practices</a></li>
<li class="toctree-l1"><a class="reference internal" href="../production-deployment.html">Production Deployment</a></li>
<li class="toctree-l1"><a class="reference internal" href="../faq.html">FAQ</a></li>
<li class="toctree-l1"><a class="reference internal" href="../privacy_notice.html">Privacy Notice</a></li>
<p class="caption" role="heading"><span class="caption-text">References</span></p>
<li class="toctree-l1"><a class="reference internal" href="../operators-and-hooks-ref.html">Operators and hooks</a></li>
<li class="toctree-l1"><a class="reference internal" href="../cli-and-env-variables-ref.html">CLI</a></li>
<li class="toctree-l1"><a class="reference internal" href="../templates-ref.html">Templates</a></li>
<li class="toctree-l1"><a class="reference internal" href="../python-api-ref.html">Python API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../stable-rest-api-ref.html">Stable REST API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../deprecated-rest-api-ref.html">Deprecated REST API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../configurations-ref.html">Configurations</a></li>
<li class="toctree-l1"><a class="reference internal" href="../extra-packages-ref.html">Extra packages</a></li>
<li class="toctree-l1"><a class="reference internal" href="../migrations-ref.html">Database Migrations</a></li>
<div class="toctree" role="navigation" aria-label="main navigation">
<p class="caption" role="heading"><span class="caption-text">Content</span></p>
<ul class="current">
<li class="toctree-l1"><a class="reference internal" href="../index.html">Home</a></li>
<li class="toctree-l1"><a class="reference internal" href="../project.html">Project</a></li>
<li class="toctree-l1"><a class="reference internal" href="../license.html">License</a></li>
<li class="toctree-l1"><a class="reference internal" href="../start/index.html">Quick Start</a></li>
<li class="toctree-l1"><a class="reference internal" href="../installation/index.html">Installation</a></li>
<li class="toctree-l1"><a class="reference internal" href="../upgrading-from-1-10/index.html">Upgrading from 1.10 to 2</a></li>
<li class="toctree-l1"><a class="reference internal" href="../tutorial.html">Tutorial</a></li>
<li class="toctree-l1"><a class="reference internal" href="../tutorial_taskflow_api.html">Tutorial on the TaskFlow API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../howto/index.html">How-to Guides</a></li>
<li class="toctree-l1"><a class="reference internal" href="../ui.html">UI / Screenshots</a></li>
<li class="toctree-l1 current"><a class="reference internal" href="index.html">Concepts</a><ul class="current">
<li class="toctree-l2"><a class="reference internal" href="overview.html">Architecture Overview</a></li>
<li class="toctree-l2"><a class="reference internal" href="dags.html">DAGs</a></li>
<li class="toctree-l2"><a class="reference internal" href="tasks.html">Tasks</a></li>
<li class="toctree-l2"><a class="reference internal" href="operators.html">Operators</a></li>
<li class="toctree-l2"><a class="reference internal" href="dynamic-task-mapping.html">Dynamic Task Mapping</a></li>
<li class="toctree-l2"><a class="reference internal" href="sensors.html">Sensors</a></li>
<li class="toctree-l2 current"><a class="current reference internal" href="#">Deferrable Operators &amp; Triggers</a><ul>
<li class="toctree-l3"><a class="reference internal" href="#using-deferrable-operators">Using Deferrable Operators</a></li>
<li class="toctree-l3"><a class="reference internal" href="#writing-deferrable-operators">Writing Deferrable Operators</a><ul>
<li class="toctree-l4"><a class="reference internal" href="#triggering-deferral">Triggering Deferral</a></li>
<li class="toctree-l4"><a class="reference internal" href="#writing-triggers">Writing Triggers</a></li>
<li class="toctree-l3"><a class="reference internal" href="#high-availability">High Availability</a></li>
<li class="toctree-l3"><a class="reference internal" href="#smart-sensors">Smart Sensors</a></li>
<li class="toctree-l2"><a class="reference internal" href="smart-sensors.html">Smart Sensors</a></li>
<li class="toctree-l2"><a class="reference internal" href="taskflow.html">TaskFlow</a></li>
<li class="toctree-l2"><a class="reference internal" href="../executor/index.html">Executor</a></li>
<li class="toctree-l2"><a class="reference internal" href="scheduler.html">Scheduler</a></li>
<li class="toctree-l2"><a class="reference internal" href="dagfile-processing.html">DAG File Processing</a></li>
<li class="toctree-l2"><a class="reference internal" href="pools.html">Pools</a></li>
<li class="toctree-l2"><a class="reference internal" href="timetable.html">Timetables</a></li>
<li class="toctree-l2"><a class="reference internal" href="priority-weight.html">Priority Weights</a></li>
<li class="toctree-l2"><a class="reference internal" href="cluster-policies.html">Cluster Policies</a></li>
<li class="toctree-l2"><a class="reference internal" href="xcoms.html">XComs</a></li>
<li class="toctree-l2"><a class="reference internal" href="variables.html">Variables</a></li>
<li class="toctree-l2"><a class="reference internal" href="connections.html">Connections &amp; Hooks</a></li>
<li class="toctree-l2"><a class="reference internal" href="params.html">Params</a></li>
<li class="toctree-l1"><a class="reference internal" href="../executor/index.html">Executor</a></li>
<li class="toctree-l1"><a class="reference internal" href="../dag-run.html">DAG Runs</a></li>
<li class="toctree-l1"><a class="reference internal" href="../plugins.html">Plugins</a></li>
<li class="toctree-l1"><a class="reference internal" href="../security/index.html">Security</a></li>
<li class="toctree-l1"><a class="reference internal" href="../logging-monitoring/index.html">Logging &amp; Monitoring</a></li>
<li class="toctree-l1"><a class="reference internal" href="../timezone.html">Time Zones</a></li>
<li class="toctree-l1"><a class="reference internal" href="../usage-cli.html">Using the CLI</a></li>
<li class="toctree-l1"><a class="reference internal" href="../integration.html">Integration</a></li>
<li class="toctree-l1"><a class="reference internal" href="../kubernetes.html">Kubernetes</a></li>
<li class="toctree-l1"><a class="reference internal" href="../lineage.html">Lineage</a></li>
<li class="toctree-l1"><a class="reference internal" href="../listeners.html">Listeners</a></li>
<li class="toctree-l1"><a class="reference internal" href="../dag-serialization.html">DAG Serialization</a></li>
<li class="toctree-l1"><a class="reference internal" href="../modules_management.html">Modules Management</a></li>
<li class="toctree-l1"><a class="reference internal" href="../release-process.html">Release Policies</a></li>
<li class="toctree-l1"><a class="reference internal" href="../release_notes.html">Release Notes</a></li>
<li class="toctree-l1"><a class="reference internal" href="../best-practices.html">Best Practices</a></li>
<li class="toctree-l1"><a class="reference internal" href="../production-deployment.html">Production Deployment</a></li>
<li class="toctree-l1"><a class="reference internal" href="../faq.html">FAQ</a></li>
<li class="toctree-l1"><a class="reference internal" href="../privacy_notice.html">Privacy Notice</a></li>
<p class="caption" role="heading"><span class="caption-text">References</span></p>
<li class="toctree-l1"><a class="reference internal" href="../operators-and-hooks-ref.html">Operators and hooks</a></li>
<li class="toctree-l1"><a class="reference internal" href="../cli-and-env-variables-ref.html">CLI</a></li>
<li class="toctree-l1"><a class="reference internal" href="../templates-ref.html">Templates</a></li>
<li class="toctree-l1"><a class="reference internal" href="../python-api-ref.html">Python API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../stable-rest-api-ref.html">Stable REST API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../deprecated-rest-api-ref.html">Deprecated REST API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../configurations-ref.html">Configurations</a></li>
<li class="toctree-l1"><a class="reference internal" href="../extra-packages-ref.html">Extra packages</a></li>
<li class="toctree-l1"><a class="reference internal" href="../migrations-ref.html">Database Migrations</a></li>
<div class="section" id="deferrable-operators-triggers">
<h1>Deferrable Operators &amp; Triggers<a class="headerlink" href="#deferrable-operators-triggers" title="Permalink to this heading"></a></h1>
<p>Standard <a class="reference internal" href="operators.html"><span class="doc">Operators</span></a> and <a class="reference internal" href="sensors.html"><span class="doc">Sensors</span></a> take up a full <em>worker slot</em> for the entire time they are running, even if they are idle; for example, if you only have 100 worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor that’s currently running but idle, then you <em>cannot run anything else</em> - even though your entire Airflow cluster is essentially idle. <code class="docutils literal notranslate"><span class="pre">reschedule</span></code> mode for Sensors solves some of this, allowing Sensors to only run at fixed intervals, but it is inflexible and only allows using time as the reason to resume, not anything else.</p>
<p>This is where <em>Deferrable Operators</em> come in. A deferrable operator is one that is written with the ability to suspend itself and free up the worker when it knows it has to wait, and hand off the job of resuming it to something called a <em>Trigger</em>. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors.</p>
<p><em>Triggers</em> are small, asynchronous pieces of Python code designed to be run all together in a single Python process; because they are asynchronous, they are able to all co-exist efficiently. As an overview of how this process works:</p>
<ul class="simple">
<li><p>A task instance (running operator) gets to a point where it has to wait, and defers itself with a trigger tied to the event that should resume it. This frees up the worker to run something else.</p></li>
<li><p>The new Trigger instance is registered inside Airflow, and picked up by a <em>triggerer</em> process</p></li>
<li><p>The trigger is run until it fires, at which point its source task is re-scheduled</p></li>
<li><p>The scheduler queues the task to resume on a worker node</p></li>
<p>Using deferrable operators as a DAG author is almost transparent; writing them, however, takes a bit more work.</p>
<div class="admonition note">
<p class="admonition-title">Note</p>
<p>Deferrable Operators &amp; Triggers rely on more recent <code class="docutils literal notranslate"><span class="pre">asyncio</span></code> features, and as a result only work
on Python 3.7 or higher.</p>
<div class="section" id="using-deferrable-operators">
<h2>Using Deferrable Operators<a class="headerlink" href="#using-deferrable-operators" title="Permalink to this heading"></a></h2>
<p>If all you wish to do is use pre-written Deferrable Operators (such as <code class="docutils literal notranslate"><span class="pre">TimeSensorAsync</span></code>, which comes with Airflow), then there are only two steps you need:</p>
<ul class="simple">
<li><p>Ensure your Airflow installation is running at least one <code class="docutils literal notranslate"><span class="pre">triggerer</span></code> process, as well as the normal <code class="docutils literal notranslate"><span class="pre">scheduler</span></code></p></li>
<li><p>Use deferrable operators/sensors in your DAGs</p></li>
<p>That’s it; everything else will be automatically handled for you. If you’re upgrading existing DAGs, we even provide some API-compatible sensor variants (e.g. <code class="docutils literal notranslate"><span class="pre">TimeSensorAsync</span></code> for <code class="docutils literal notranslate"><span class="pre">TimeSensor</span></code>) that you can swap into your DAG with no other changes required.</p>
<p>Note that you cannot yet use the deferral ability from inside custom PythonOperator/TaskFlow Python functions; it is only available to traditional, class-based Operators at the moment.</p>
<div class="section" id="writing-deferrable-operators">
<span id="deferring-writing"></span><h2>Writing Deferrable Operators<a class="headerlink" href="#writing-deferrable-operators" title="Permalink to this heading"></a></h2>
<p>Writing a deferrable operator takes a bit more work. There are some main points to consider:</p>
<ul class="simple">
<li><p>Your Operator must defer itself with a Trigger. If there is a Trigger in core Airflow you can use, great; otherwise, you will have to write one.</p></li>
<li><p>Your Operator will be stopped and removed from its worker while deferred, and no state will persist automatically. You can persist state by asking Airflow to resume you at a certain method or pass certain kwargs, but that’s it.</p></li>
<li><p>You can defer multiple times, and you can defer before/after your Operator does significant work, or only defer if certain conditions are met (e.g. a system does not have an immediate answer). Deferral is entirely under your control.</p></li>
<li><p>Any Operator can defer; no special marking on its class is needed, and it’s not limited to Sensors.</p></li>
<div class="section" id="triggering-deferral">
<h3>Triggering Deferral<a class="headerlink" href="#triggering-deferral" title="Permalink to this heading"></a></h3>
<p>If you want to trigger deferral, at any place in your Operator you can call <code class="docutils literal notranslate"><span class="pre">self.defer(trigger,</span> <span class="pre">method_name,</span> <span class="pre">kwargs,</span> <span class="pre">timeout)</span></code>, which will raise a special exception that Airflow will catch. The arguments are:</p>
<ul class="simple">
<li><p><code class="docutils literal notranslate"><span class="pre">trigger</span></code>: An instance of a Trigger that you wish to defer on. It will be serialized into the database.</p></li>
<li><p><code class="docutils literal notranslate"><span class="pre">method_name</span></code>: The method name on your Operator you want Airflow to call when it resumes.</p></li>
<li><p><code class="docutils literal notranslate"><span class="pre">kwargs</span></code>: Additional keyword arguments to pass to the method when it is called. Optional, defaults to <code class="docutils literal notranslate"><span class="pre">{}</span></code>.</p></li>
<li><p><code class="docutils literal notranslate"><span class="pre">timeout</span></code>: A timedelta that specifies a timeout after which this deferral will fail, and fail the task instance. Optional, defaults to <code class="docutils literal notranslate"><span class="pre">None</span></code>, meaning no timeout.</p></li>
<p>When you opt to defer, your Operator will <em>stop executing at that point and be removed from its current worker</em>. No state - such as local variables, or attributes set on <code class="docutils literal notranslate"><span class="pre">self</span></code> - will persist, and when your Operator is resumed it will be a <em>brand new instance</em> of it. The only way you can pass state from the old instance of the Operator to the new one is via <code class="docutils literal notranslate"><span class="pre">method_name</span></code> and <code class="docutils literal notranslate"><span class="pre">kwargs</span></code>.</p>
<p>When your Operator is resumed, an <code class="docutils literal notranslate"><span class="pre">event</span></code> item will be added to the kwargs passed to the <code class="docutils literal notranslate"><span class="pre">method_name</span></code> method. The <code class="docutils literal notranslate"><span class="pre">event</span></code> object contains the payload from the trigger event that resumed your Operator. Depending on the trigger, this may be useful to your operator (e.g. it’s a status code or URL to fetch results), or it may not be important (it’s just a datetime). Your <code class="docutils literal notranslate"><span class="pre">method_name</span></code> method, however, <em>must</em> accept <code class="docutils literal notranslate"><span class="pre">event</span></code> as a keyword argument.</p>
<p>If your Operator returns from either its first <code class="docutils literal notranslate"><span class="pre">execute()</span></code> method when it’s new, or a subsequent method specified by <code class="docutils literal notranslate"><span class="pre">method_name</span></code>, it will be considered complete and will finish executing.</p>
<p>You are free to set <code class="docutils literal notranslate"><span class="pre">method_name</span></code> to <code class="docutils literal notranslate"><span class="pre">execute</span></code> if you want your Operator to have one entrypoint, but it, too, will have to accept <code class="docutils literal notranslate"><span class="pre">event</span></code> as an optional keyword argument.</p>
<p>Here’s a basic example of how a sensor might trigger deferral:</p>
<div class="highlight-default notranslate"><div class="highlight"><pre><span></span><span class="kn">from</span> <span class="nn">datetime</span> <span class="kn">import</span> <span class="n">timedelta</span>
<span class="kn">from</span> <span class="nn">airflow.sensors.base</span> <span class="kn">import</span> <span class="n">BaseSensorOperator</span>
<span class="kn">from</span> <span class="nn">airflow.triggers.temporal</span> <span class="kn">import</span> <span class="n">TimeDeltaTrigger</span>
<span class="k">class</span> <span class="nc">WaitOneHourSensor</span><span class="p">(</span><span class="n">BaseSensorOperator</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">execute</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">defer</span><span class="p">(</span><span class="n">trigger</span><span class="o">=</span><span class="n">TimeDeltaTrigger</span><span class="p">(</span><span class="n">timedelta</span><span class="p">(</span><span class="n">hours</span><span class="o">=</span><span class="mi">1</span><span class="p">)),</span> <span class="n">method_name</span><span class="o">=</span><span class="s2">&quot;execute_complete&quot;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">execute_complete</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">context</span><span class="p">,</span> <span class="n">event</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="c1"># We have no more work to do here. Mark as complete.</span>
<span class="k">return</span>
<p>This Sensor is literally just a thin wrapper around the Trigger, so all it does is defer to the trigger, and specify a different method to come back to when the trigger fires - which, as it returns immediately, marks the Sensor as successful.</p>
<p>Under the hood, <code class="docutils literal notranslate"><span class="pre">self.defer</span></code> raises the <code class="docutils literal notranslate"><span class="pre">TaskDeferred</span></code> exception, so it will work anywhere inside your Operator’s code, even buried many nested calls deep inside <code class="docutils literal notranslate"><span class="pre">execute()</span></code>. You are free to raise <code class="docutils literal notranslate"><span class="pre">TaskDeferred</span></code> manually if you wish; it takes the same arguments as <code class="docutils literal notranslate"><span class="pre">self.defer</span></code>.</p>
<p>Note that <code class="docutils literal notranslate"><span class="pre">execution_timeout</span></code> on Operators is considered over the <em>total runtime</em>, not individual executions in-between deferrals - this means that if <code class="docutils literal notranslate"><span class="pre">execution_timeout</span></code> is set, an Operator may fail while it’s deferred or while it’s running after a deferral, even if it’s only been resumed for a few seconds.</p>
<div class="section" id="writing-triggers">
<h3>Writing Triggers<a class="headerlink" href="#writing-triggers" title="Permalink to this heading"></a></h3>
<p>A Trigger is written as a class that inherits from <code class="docutils literal notranslate"><span class="pre">BaseTrigger</span></code>, and implements three methods:</p>
<ul class="simple">
<li><p><code class="docutils literal notranslate"><span class="pre">__init__</span></code>, to receive arguments from Operators instantiating it</p></li>
<li><p><code class="docutils literal notranslate"><span class="pre">run</span></code>, an asynchronous method that runs its logic and yields one or more <code class="docutils literal notranslate"><span class="pre">TriggerEvent</span></code> instances as an asynchronous generator</p></li>
<li><p><code class="docutils literal notranslate"><span class="pre">serialize</span></code>, which returns the information needed to re-construct this trigger, as a tuple of the classpath, and keyword arguments to pass to <code class="docutils literal notranslate"><span class="pre">__init__</span></code></p></li>
<p>There’s also some design constraints to be aware of:</p>
<ul class="simple">
<li><p>The <code class="docutils literal notranslate"><span class="pre">run</span></code> method <em>must be asynchronous</em> (using Python’s asyncio), and correctly <code class="docutils literal notranslate"><span class="pre">await</span></code> whenever it does a blocking operation.</p></li>
<li><p><code class="docutils literal notranslate"><span class="pre">run</span></code> must <code class="docutils literal notranslate"><span class="pre">yield</span></code> its TriggerEvents, not return them. If it returns before yielding at least one event, Airflow will consider this an error and fail any Task Instances waiting on it. If it throws an exception, Airflow will also fail any dependent task instances.</p></li>
<li><p>You should assume that a trigger instance may run <em>more than once</em> (this can happen if a network partition occurs and Airflow re-launches a trigger on a separated machine). So you must be mindful about side effects. For example you might not want to use a trigger to insert database rows.</p></li>
<li><p>If your trigger is designed to emit more than one event (not currently supported), then each emitted event <em>must</em> contain a payload that can be used to deduplicate events if the trigger is being run in multiple places. If you only fire one event and don’t need to pass information back to the Operator, you can just set the payload to <code class="docutils literal notranslate"><span class="pre">None</span></code>.</p></li>
<li><p>A trigger may be suddenly removed from one triggerer service and started on a new one, for example if subnets are changed and a network partition results, or if there is a deployment. If desired you may implement the <code class="docutils literal notranslate"><span class="pre">cleanup</span></code> method, which is always called after <code class="docutils literal notranslate"><span class="pre">run</span></code> whether the trigger exits cleanly or otherwise.</p></li>
<div class="admonition note">
<p class="admonition-title">Note</p>
<p>Currently Triggers are only used up to their first event, as they are only used for resuming deferred tasks (which happens on the first event fired). However, we plan to allow DAGs to be launched from triggers in future, which is where multi-event triggers will be more useful.</p>
<p>Here’s the structure of a basic Trigger:</p>
<div class="highlight-default notranslate"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">asyncio</span>
<span class="kn">from</span> <span class="nn">airflow.triggers.base</span> <span class="kn">import</span> <span class="n">BaseTrigger</span><span class="p">,</span> <span class="n">TriggerEvent</span>
<span class="kn">from</span> <span class="nn">airflow.utils</span> <span class="kn">import</span> <span class="n">timezone</span>
<span class="k">class</span> <span class="nc">DateTimeTrigger</span><span class="p">(</span><span class="n">BaseTrigger</span><span class="p">):</span>
<span class="k">def</span> <span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">moment</span><span class="p">):</span>
<span class="nb">super</span><span class="p">()</span><span class="o">.</span><span class="fm">__init__</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">moment</span> <span class="o">=</span> <span class="n">moment</span>
<span class="k">def</span> <span class="nf">serialize</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">(</span><span class="s2">&quot;airflow.triggers.temporal.DateTimeTrigger&quot;</span><span class="p">,</span> <span class="p">{</span><span class="s2">&quot;moment&quot;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">moment</span><span class="p">})</span>
<span class="k">async</span> <span class="k">def</span> <span class="nf">run</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">while</span> <span class="bp">self</span><span class="o">.</span><span class="n">moment</span> <span class="o">&gt;</span> <span class="n">timezone</span><span class="o">.</span><span class="n">utcnow</span><span class="p">():</span>
<span class="k">await</span> <span class="n">asyncio</span><span class="o">.</span><span class="n">sleep</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>
<span class="k">yield</span> <span class="n">TriggerEvent</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">moment</span><span class="p">)</span>
<p>This is a very simplified version of Airflow’s <code class="docutils literal notranslate"><span class="pre">DateTimeTrigger</span></code>, and you can see several things here:</p>
<ul class="simple">
<li><p><code class="docutils literal notranslate"><span class="pre">__init__</span></code> and <code class="docutils literal notranslate"><span class="pre">serialize</span></code> are written as a pair; the Trigger is instantiated once when it is submitted by the Operator as part of its deferral request, then serialized and re-instantiated on any <em>triggerer</em> process that runs the trigger.</p></li>
<li><p>The <code class="docutils literal notranslate"><span class="pre">run</span></code> method is declared as an <code class="docutils literal notranslate"><span class="pre">async</span> <span class="pre">def</span></code>, as it <em>must</em> be asynchronous, and uses <code class="docutils literal notranslate"><span class="pre">asyncio.sleep</span></code> rather than the regular <code class="docutils literal notranslate"><span class="pre">time.sleep</span></code> (as that would block the process).</p></li>
<li><p>When it emits its event it packs <code class="docutils literal notranslate"><span class="pre">self.moment</span></code> in there, so if this trigger is being run redundantly on multiple hosts, the event can be de-duplicated.</p></li>
<p>Triggers can be as complex or as simple as you like provided you keep inside this contract; they are designed to be run in a highly-available fashion, auto-distributed among hosts running the <em>triggerer</em>. We encourage you to avoid any kind of persistent state in a trigger; they should get everything they need from their <code class="docutils literal notranslate"><span class="pre">__init__</span></code>, so they can be serialized and moved around freely.</p>
<p>If you are new to writing asynchronous Python, you should be very careful writing your <code class="docutils literal notranslate"><span class="pre">run()</span></code> method; Python’s async model means that any code that does not correctly <code class="docutils literal notranslate"><span class="pre">await</span></code> when it does a blocking operation will block the <em>entire process</em>. Airflow will attempt to detect this and warn you in the triggerer logs when it happens, but we strongly suggest you set the variable <code class="docutils literal notranslate"><span class="pre">PYTHONASYNCIODEBUG=1</span></code> when you are writing your Trigger to enable extra checks from Python to make sure you’re writing non-blocking code. Be especially careful when doing filesystem calls, as if the underlying filesystem is network-backed it may be blocking.</p>
<div class="section" id="high-availability">
<h2>High Availability<a class="headerlink" href="#high-availability" title="Permalink to this heading"></a></h2>
<p>Triggers are designed from the ground-up to be highly-available; if you want to run a highly-available setup, simply run multiple copies of <code class="docutils literal notranslate"><span class="pre">triggerer</span></code> on multiple hosts. Much like <code class="docutils literal notranslate"><span class="pre">scheduler</span></code>, they will automatically co-exist with correct locking and HA.</p>
<p>Depending on how much work the triggers are doing, you can fit from hundreds to tens of thousands of triggers on a single <code class="docutils literal notranslate"><span class="pre">triggerer</span></code> host. By default, every <code class="docutils literal notranslate"><span class="pre">triggerer</span></code> will have a capacity of 1000 triggers it will try to run at once; you can change this with the <code class="docutils literal notranslate"><span class="pre">--capacity</span></code> argument. If you have more triggers trying to run than you have capacity across all of your <code class="docutils literal notranslate"><span class="pre">triggerer</span></code> processes, some triggers will be delayed from running until others have completed.</p>
<p>Airflow tries to only run triggers in one place at once, and maintains a heartbeat to all <code class="docutils literal notranslate"><span class="pre">triggerers</span></code> that are currently running. If a <code class="docutils literal notranslate"><span class="pre">triggerer</span></code> dies, or becomes partitioned from the network where Airflow’s database is running, Airflow will automatically re-schedule triggers that were on that host to run elsewhere (after waiting 30 seconds for the machine to re-appear).</p>
<p>This means it’s possible, but unlikely, for triggers to run in multiple places at once; this is designed into the Trigger contract, however, and entirely expected. Airflow will de-duplicate events fired when a trigger is running in multiple places simultaneously, so this process should be transparent to your Operators.</p>
<p>Note that every extra <code class="docutils literal notranslate"><span class="pre">triggerer</span></code> you run will result in an extra persistent connection to your database.</p>
<div class="section" id="smart-sensors">
<h2>Smart Sensors<a class="headerlink" href="#smart-sensors" title="Permalink to this heading"></a></h2>
<p>Deferrable Operators supersede <a class="reference internal" href="smart-sensors.html"><span class="doc">Smart Sensors</span></a>. They do solve fundamentally the same problem; Smart Sensors, however, only work for certain Sensor workload styles, have no redundancy, and require a custom DAG to run at all times.</p>
