blob: 69d7e7497b36622a2318a897e33ec854cd67a9e5 [file] [log] [blame]
<!DOCTYPE html>
<!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]-->
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>airflow.contrib.executors.kubernetes_executor &mdash; Airflow Documentation</title>
<link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" />
<link rel="stylesheet" href="../../../../_static/pygments.css" type="text/css" />
<link rel="index" title="Index" href="../../../../genindex.html" />
<link rel="search" title="Search" href="../../../../search.html" />
<script src="../../../../_static/js/modernizr.min.js"></script>
</head>
<body class="wy-body-for-nav">
<div class="wy-grid-for-nav">
<nav data-toggle="wy-nav-shift" class="wy-nav-side">
<div class="wy-side-scroll">
<div class="wy-side-nav-search">
<a href="../../../../index.html" class="icon icon-home"> Airflow
</a>
<div class="version">
1.10.2
</div>
<div role="search">
<form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get">
<input type="text" name="q" placeholder="Search docs" />
<input type="hidden" name="check_keywords" value="yes" />
<input type="hidden" name="area" value="default" />
</form>
</div>
</div>
<div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation">
<ul>
<li class="toctree-l1"><a class="reference internal" href="../../../../project.html">Project</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../license.html">License</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../start.html">Quick Start</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../installation.html">Installation</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../tutorial.html">Tutorial</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../howto/index.html">How-to Guides</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../ui.html">UI / Screenshots</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../concepts.html">Concepts</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../profiling.html">Data Profiling</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../cli.html">Command Line Interface</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../scheduler.html">Scheduling &amp; Triggers</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../plugins.html">Plugins</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../security.html">Security</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../timezone.html">Time zones</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../api.html">Experimental Rest API</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../integration.html">Integration</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../metrics.html">Metrics</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../kubernetes.html">Kubernetes</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../lineage.html">Lineage</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../changelog.html">Changelog</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../faq.html">FAQ</a></li>
<li class="toctree-l1"><a class="reference internal" href="../../../../code.html">API Reference</a></li>
</ul>
</div>
</div>
</nav>
<section data-toggle="wy-nav-shift" class="wy-nav-content-wrap">
<nav class="wy-nav-top" aria-label="top navigation">
<i data-toggle="wy-nav-top" class="fa fa-bars"></i>
<a href="../../../../index.html">Airflow</a>
</nav>
<div class="wy-nav-content">
<div class="rst-content">
<div role="navigation" aria-label="breadcrumbs navigation">
<ul class="wy-breadcrumbs">
<li><a href="../../../../index.html">Docs</a> &raquo;</li>
<li><a href="../../../index.html">Module code</a> &raquo;</li>
<li>airflow.contrib.executors.kubernetes_executor</li>
<li class="wy-breadcrumbs-aside">
</li>
</ul>
<hr/>
</div>
<div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article">
<div itemprop="articleBody">
<h1>Source code for airflow.contrib.executors.kubernetes_executor</h1><div class="highlight"><pre>
<span></span><span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span>
<span class="c1"># or more contributor license agreements. See the NOTICE file</span>
<span class="c1"># distributed with this work for additional information</span>
<span class="c1"># regarding copyright ownership. The ASF licenses this file</span>
<span class="c1"># to you under the Apache License, Version 2.0 (the</span>
<span class="c1"># &quot;License&quot;); you may not use this file except in compliance</span>
<span class="c1"># with the License. You may obtain a copy of the License at</span>
<span class="c1">#</span>
<span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span>
<span class="c1">#</span>
<span class="c1"># Unless required by applicable law or agreed to in writing,</span>
<span class="c1"># software distributed under the License is distributed on an</span>
<span class="c1"># &quot;AS IS&quot; BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span>
<span class="c1"># KIND, either express or implied. See the License for the</span>
<span class="c1"># specific language governing permissions and limitations</span>
<span class="c1"># under the License.</span>
<span class="kn">import</span> <span class="nn">base64</span>
<span class="kn">import</span> <span class="nn">json</span>
<span class="kn">import</span> <span class="nn">multiprocessing</span>
<span class="kn">from</span> <span class="nn">queue</span> <span class="k">import</span> <span class="n">Queue</span>
<span class="kn">from</span> <span class="nn">dateutil</span> <span class="k">import</span> <span class="n">parser</span>
<span class="kn">from</span> <span class="nn">uuid</span> <span class="k">import</span> <span class="n">uuid4</span>
<span class="kn">import</span> <span class="nn">kubernetes</span>
<span class="kn">from</span> <span class="nn">kubernetes</span> <span class="k">import</span> <span class="n">watch</span><span class="p">,</span> <span class="n">client</span>
<span class="kn">from</span> <span class="nn">kubernetes.client.rest</span> <span class="k">import</span> <span class="n">ApiException</span>
<span class="kn">from</span> <span class="nn">airflow.configuration</span> <span class="k">import</span> <span class="n">conf</span>
<span class="kn">from</span> <span class="nn">airflow.contrib.kubernetes.pod_launcher</span> <span class="k">import</span> <span class="n">PodLauncher</span>
<span class="kn">from</span> <span class="nn">airflow.contrib.kubernetes.kube_client</span> <span class="k">import</span> <span class="n">get_kube_client</span>
<span class="kn">from</span> <span class="nn">airflow.contrib.kubernetes.worker_configuration</span> <span class="k">import</span> <span class="n">WorkerConfiguration</span>
<span class="kn">from</span> <span class="nn">airflow.executors.base_executor</span> <span class="k">import</span> <span class="n">BaseExecutor</span>
<span class="kn">from</span> <span class="nn">airflow.executors</span> <span class="k">import</span> <span class="n">Executors</span>
<span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">TaskInstance</span><span class="p">,</span> <span class="n">KubeResourceVersion</span><span class="p">,</span> <span class="n">KubeWorkerIdentifier</span>
<span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="k">import</span> <span class="n">State</span>
<span class="kn">from</span> <span class="nn">airflow.utils.db</span> <span class="k">import</span> <span class="n">provide_session</span><span class="p">,</span> <span class="n">create_session</span>
<span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">configuration</span>
<span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowConfigException</span><span class="p">,</span> <span class="n">AirflowException</span>
<span class="kn">from</span> <span class="nn">airflow.utils.log.logging_mixin</span> <span class="k">import</span> <span class="n">LoggingMixin</span>
<span class="k">class</span> <span class="nc">KubernetesExecutorConfig</span><span class="p">:</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">image</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">image_pull_policy</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">request_memory</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">request_cpu</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">limit_memory</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">limit_cpu</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">gcp_service_account_key</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">node_selectors</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">affinity</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span>
<span class="n">annotations</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">volumes</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">volume_mounts</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">tolerations</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">image</span> <span class="o">=</span> <span class="n">image</span>
<span class="bp">self</span><span class="o">.</span><span class="n">image_pull_policy</span> <span class="o">=</span> <span class="n">image_pull_policy</span>
<span class="bp">self</span><span class="o">.</span><span class="n">request_memory</span> <span class="o">=</span> <span class="n">request_memory</span>
<span class="bp">self</span><span class="o">.</span><span class="n">request_cpu</span> <span class="o">=</span> <span class="n">request_cpu</span>
<span class="bp">self</span><span class="o">.</span><span class="n">limit_memory</span> <span class="o">=</span> <span class="n">limit_memory</span>
<span class="bp">self</span><span class="o">.</span><span class="n">limit_cpu</span> <span class="o">=</span> <span class="n">limit_cpu</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gcp_service_account_key</span> <span class="o">=</span> <span class="n">gcp_service_account_key</span>
<span class="bp">self</span><span class="o">.</span><span class="n">node_selectors</span> <span class="o">=</span> <span class="n">node_selectors</span>
<span class="bp">self</span><span class="o">.</span><span class="n">affinity</span> <span class="o">=</span> <span class="n">affinity</span>
<span class="bp">self</span><span class="o">.</span><span class="n">annotations</span> <span class="o">=</span> <span class="n">annotations</span>
<span class="bp">self</span><span class="o">.</span><span class="n">volumes</span> <span class="o">=</span> <span class="n">volumes</span>
<span class="bp">self</span><span class="o">.</span><span class="n">volume_mounts</span> <span class="o">=</span> <span class="n">volume_mounts</span>
<span class="bp">self</span><span class="o">.</span><span class="n">tolerations</span> <span class="o">=</span> <span class="n">tolerations</span>
<span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="s2">&quot;</span><span class="si">{}</span><span class="s2">(image=</span><span class="si">{}</span><span class="s2">, image_pull_policy=</span><span class="si">{}</span><span class="s2">, request_memory=</span><span class="si">{}</span><span class="s2">, request_cpu=</span><span class="si">{}</span><span class="s2">, &quot;</span> \
<span class="s2">&quot;limit_memory=</span><span class="si">{}</span><span class="s2">, limit_cpu=</span><span class="si">{}</span><span class="s2">, gcp_service_account_key=</span><span class="si">{}</span><span class="s2">, &quot;</span> \
<span class="s2">&quot;node_selectors=</span><span class="si">{}</span><span class="s2">, affinity=</span><span class="si">{}</span><span class="s2">, annotations=</span><span class="si">{}</span><span class="s2">, volumes=</span><span class="si">{}</span><span class="s2">, &quot;</span> \
<span class="s2">&quot;volume_mounts=</span><span class="si">{}</span><span class="s2">, tolerations=</span><span class="si">{}</span><span class="s2">)&quot;</span> \
<span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">KubernetesExecutorConfig</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">image</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">image_pull_policy</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">request_memory</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_cpu</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">limit_memory</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">limit_cpu</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">gcp_service_account_key</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">node_selectors</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">affinity</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">annotations</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">volumes</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">volume_mounts</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">tolerations</span><span class="p">)</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">from_dict</span><span class="p">(</span><span class="n">obj</span><span class="p">):</span>
<span class="k">if</span> <span class="n">obj</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span>
<span class="k">return</span> <span class="n">KubernetesExecutorConfig</span><span class="p">()</span>
<span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span>
<span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span>
<span class="s1">&#39;Cannot convert a non-dictionary object into a KubernetesExecutorConfig&#39;</span><span class="p">)</span>
<span class="n">namespaced</span> <span class="o">=</span> <span class="n">obj</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">Executors</span><span class="o">.</span><span class="n">KubernetesExecutor</span><span class="p">,</span> <span class="p">{})</span>
<span class="k">return</span> <span class="n">KubernetesExecutorConfig</span><span class="p">(</span>
<span class="n">image</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;image&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span>
<span class="n">image_pull_policy</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;image_pull_policy&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span>
<span class="n">request_memory</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;request_memory&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span>
<span class="n">request_cpu</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;request_cpu&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span>
<span class="n">limit_memory</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;limit_memory&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span>
<span class="n">limit_cpu</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;limit_cpu&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span>
<span class="n">gcp_service_account_key</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;gcp_service_account_key&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span>
<span class="n">node_selectors</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;node_selectors&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span>
<span class="n">affinity</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;affinity&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span>
<span class="n">annotations</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;annotations&#39;</span><span class="p">,</span> <span class="p">{}),</span>
<span class="n">volumes</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;volumes&#39;</span><span class="p">,</span> <span class="p">[]),</span>
<span class="n">volume_mounts</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;volume_mounts&#39;</span><span class="p">,</span> <span class="p">[]),</span>
<span class="n">tolerations</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;tolerations&#39;</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span>
<span class="p">)</span>
<span class="k">def</span> <span class="nf">as_dict</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">return</span> <span class="p">{</span>
<span class="s1">&#39;image&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">image</span><span class="p">,</span>
<span class="s1">&#39;image_pull_policy&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">image_pull_policy</span><span class="p">,</span>
<span class="s1">&#39;request_memory&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_memory</span><span class="p">,</span>
<span class="s1">&#39;request_cpu&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_cpu</span><span class="p">,</span>
<span class="s1">&#39;limit_memory&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">limit_memory</span><span class="p">,</span>
<span class="s1">&#39;limit_cpu&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">limit_cpu</span><span class="p">,</span>
<span class="s1">&#39;gcp_service_account_key&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">gcp_service_account_key</span><span class="p">,</span>
<span class="s1">&#39;node_selectors&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">node_selectors</span><span class="p">,</span>
<span class="s1">&#39;affinity&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">affinity</span><span class="p">,</span>
<span class="s1">&#39;annotations&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">annotations</span><span class="p">,</span>
<span class="s1">&#39;volumes&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">volumes</span><span class="p">,</span>
<span class="s1">&#39;volume_mounts&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">volume_mounts</span><span class="p">,</span>
<span class="s1">&#39;tolerations&#39;</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">tolerations</span><span class="p">,</span>
<span class="p">}</span>
<span class="k">class</span> <span class="nc">KubeConfig</span><span class="p">:</span>
<span class="n">core_section</span> <span class="o">=</span> <span class="s1">&#39;core&#39;</span>
<span class="n">kubernetes_section</span> <span class="o">=</span> <span class="s1">&#39;kubernetes&#39;</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">configuration_dict</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">as_dict</span><span class="p">(</span><span class="n">display_sensitive</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">core_configuration</span> <span class="o">=</span> <span class="n">configuration_dict</span><span class="p">[</span><span class="s1">&#39;core&#39;</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_secrets</span> <span class="o">=</span> <span class="n">configuration_dict</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;kubernetes_secrets&#39;</span><span class="p">,</span> <span class="p">{})</span>
<span class="bp">self</span><span class="o">.</span><span class="n">airflow_home</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">core_section</span><span class="p">,</span> <span class="s1">&#39;airflow_home&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dags_folder</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">core_section</span><span class="p">,</span> <span class="s1">&#39;dags_folder&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">parallelism</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">getint</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">core_section</span><span class="p">,</span> <span class="s1">&#39;PARALLELISM&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">worker_container_repository</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;worker_container_repository&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">worker_container_tag</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;worker_container_tag&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_image</span> <span class="o">=</span> <span class="s1">&#39;</span><span class="si">{}</span><span class="s1">:</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">worker_container_repository</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">worker_container_tag</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_image_pull_policy</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s2">&quot;worker_container_image_pull_policy&quot;</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_node_selectors</span> <span class="o">=</span> <span class="n">configuration_dict</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;kubernetes_node_selectors&#39;</span><span class="p">,</span> <span class="p">{})</span>
<span class="bp">self</span><span class="o">.</span><span class="n">delete_worker_pods</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;delete_worker_pods&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">worker_service_account_name</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;worker_service_account_name&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">image_pull_secrets</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;image_pull_secrets&#39;</span><span class="p">)</span>
<span class="c1"># NOTE: user can build the dags into the docker image directly,</span>
<span class="c1"># this will set to True if so</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dags_in_image</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;dags_in_image&#39;</span><span class="p">)</span>
<span class="c1"># NOTE: `git_repo` and `git_branch` must be specified together as a pair</span>
<span class="c1"># The http URL of the git repository to clone from</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_repo</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;git_repo&#39;</span><span class="p">)</span>
<span class="c1"># The branch of the repository to be checked out</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_branch</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;git_branch&#39;</span><span class="p">)</span>
<span class="c1"># Optionally, the directory in the git repository containing the dags</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_subpath</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;git_subpath&#39;</span><span class="p">)</span>
<span class="c1"># Optionally, the root directory for git operations</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_sync_root</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;git_sync_root&#39;</span><span class="p">)</span>
<span class="c1"># Optionally, the name at which to publish the checked-out files under --root</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_sync_dest</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;git_sync_dest&#39;</span><span class="p">)</span>
<span class="c1"># Optionally, if git_dags_folder_mount_point is set the worker will use</span>
<span class="c1"># {git_dags_folder_mount_point}/{git_sync_dest}/{git_subpath} as dags_folder</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_dags_folder_mount_point</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span>
<span class="s1">&#39;git_dags_folder_mount_point&#39;</span><span class="p">)</span>
<span class="c1"># Optionally a user may supply a `git_user` and `git_password` for private</span>
<span class="c1"># repositories</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_user</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;git_user&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_password</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;git_password&#39;</span><span class="p">)</span>
<span class="c1"># NOTE: The user may optionally use a volume claim to mount a PV containing</span>
<span class="c1"># DAGs directly</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dags_volume_claim</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;dags_volume_claim&#39;</span><span class="p">)</span>
<span class="c1"># This prop may optionally be set for PV Claims and is used to write logs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">logs_volume_claim</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;logs_volume_claim&#39;</span><span class="p">)</span>
<span class="c1"># This prop may optionally be set for PV Claims and is used to locate DAGs</span>
<span class="c1"># on a SubPath</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dags_volume_subpath</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;dags_volume_subpath&#39;</span><span class="p">)</span>
<span class="c1"># This prop may optionally be set for PV Claims and is used to locate logs</span>
<span class="c1"># on a SubPath</span>
<span class="bp">self</span><span class="o">.</span><span class="n">logs_volume_subpath</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;logs_volume_subpath&#39;</span><span class="p">)</span>
<span class="c1"># Optionally, hostPath volume containing DAGs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">dags_volume_host</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;dags_volume_host&#39;</span><span class="p">)</span>
<span class="c1"># Optionally, write logs to a hostPath Volume</span>
<span class="bp">self</span><span class="o">.</span><span class="n">logs_volume_host</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;logs_volume_host&#39;</span><span class="p">)</span>
<span class="c1"># This prop may optionally be set for PV Claims and is used to write logs</span>
<span class="bp">self</span><span class="o">.</span><span class="n">base_log_folder</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">core_section</span><span class="p">,</span> <span class="s1">&#39;base_log_folder&#39;</span><span class="p">)</span>
<span class="c1"># The Kubernetes Namespace in which the Scheduler and Webserver reside. Note</span>
<span class="c1"># that if your</span>
<span class="c1"># cluster has RBAC enabled, your scheduler may need service account permissions to</span>
<span class="c1"># create, watch, get, and delete pods in this namespace.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_namespace</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;namespace&#39;</span><span class="p">)</span>
<span class="c1"># The Kubernetes Namespace in which pods will be created by the executor. Note</span>
<span class="c1"># that if your</span>
<span class="c1"># cluster has RBAC enabled, your workers may need service account permissions to</span>
<span class="c1"># interact with cluster components.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">executor_namespace</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;namespace&#39;</span><span class="p">)</span>
<span class="c1"># Task secrets managed by KubernetesExecutor.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">gcp_service_account_keys</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span>
<span class="s1">&#39;gcp_service_account_keys&#39;</span><span class="p">)</span>
<span class="c1"># If the user is using the git-sync container to clone their repository via git,</span>
<span class="c1"># allow them to specify repository, tag, and pod name for the init container.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_sync_container_repository</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;git_sync_container_repository&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_sync_container_tag</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;git_sync_container_tag&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_sync_container</span> <span class="o">=</span> <span class="s1">&#39;</span><span class="si">{}</span><span class="s1">:</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_sync_container_repository</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">git_sync_container_tag</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">git_sync_init_container_name</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;git_sync_init_container_name&#39;</span><span class="p">)</span>
<span class="c1"># The worker pod may optionally have a valid Airflow config loaded via a</span>
<span class="c1"># configmap</span>
<span class="bp">self</span><span class="o">.</span><span class="n">airflow_configmap</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;airflow_configmap&#39;</span><span class="p">)</span>
<span class="n">affinity_json</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;affinity&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">affinity_json</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_affinity</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">affinity_json</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_affinity</span> <span class="o">=</span> <span class="kc">None</span>
<span class="n">tolerations_json</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">&#39;tolerations&#39;</span><span class="p">)</span>
<span class="k">if</span> <span class="n">tolerations_json</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_tolerations</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">tolerations_json</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_tolerations</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_validate</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">_validate</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="c1"># TODO: use XOR for dags_volume_claim and git_dags_folder_mount_point</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags_volume_claim</span> \
<span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags_volume_host</span> \
<span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags_in_image</span> \
<span class="ow">and</span> <span class="p">(</span><span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">git_repo</span> <span class="ow">or</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">git_branch</span> <span class="ow">or</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">git_dags_folder_mount_point</span><span class="p">):</span>
<span class="k">raise</span> <span class="n">AirflowConfigException</span><span class="p">(</span>
<span class="s1">&#39;In kubernetes mode the following must be set in the `kubernetes` &#39;</span>
<span class="s1">&#39;config section: `dags_volume_claim` &#39;</span>
<span class="s1">&#39;or `dags_volume_host` &#39;</span>
<span class="s1">&#39;or `dags_in_image` &#39;</span>
<span class="s1">&#39;or `git_repo and git_branch and git_dags_folder_mount_point`&#39;</span><span class="p">)</span>
<span class="k">class</span> <span class="nc">KubernetesJobWatcher</span><span class="p">(</span><span class="n">multiprocessing</span><span class="o">.</span><span class="n">Process</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">,</span> <span class="nb">object</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">namespace</span><span class="p">,</span> <span class="n">watcher_queue</span><span class="p">,</span> <span class="n">resource_version</span><span class="p">,</span> <span class="n">worker_uuid</span><span class="p">):</span>
<span class="n">multiprocessing</span><span class="o">.</span><span class="n">Process</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">namespace</span> <span class="o">=</span> <span class="n">namespace</span>
<span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span> <span class="o">=</span> <span class="n">worker_uuid</span>
<span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span> <span class="o">=</span> <span class="n">watcher_queue</span>
<span class="bp">self</span><span class="o">.</span><span class="n">resource_version</span> <span class="o">=</span> <span class="n">resource_version</span>
<span class="k">def</span> <span class="nf">run</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">kube_client</span> <span class="o">=</span> <span class="n">get_kube_client</span><span class="p">()</span>
<span class="k">while</span> <span class="kc">True</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">resource_version</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_run</span><span class="p">(</span><span class="n">kube_client</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">resource_version</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s1">&#39;Unknown error in KubernetesJobWatcher. Failing&#39;</span><span class="p">)</span>
<span class="k">raise</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span><span class="s1">&#39;Watch died gracefully, starting back up with: &#39;</span>
<span class="s1">&#39;last resource_version: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">resource_version</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_run</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">kube_client</span><span class="p">,</span> <span class="n">resource_version</span><span class="p">,</span> <span class="n">worker_uuid</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Event: and now my watch begins starting at resource_version: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">resource_version</span>
<span class="p">)</span>
<span class="n">watcher</span> <span class="o">=</span> <span class="n">watch</span><span class="o">.</span><span class="n">Watch</span><span class="p">()</span>
<span class="n">kwargs</span> <span class="o">=</span> <span class="p">{</span><span class="s1">&#39;label_selector&#39;</span><span class="p">:</span> <span class="s1">&#39;airflow-worker=</span><span class="si">{}</span><span class="s1">&#39;</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">worker_uuid</span><span class="p">)}</span>
<span class="k">if</span> <span class="n">resource_version</span><span class="p">:</span>
<span class="n">kwargs</span><span class="p">[</span><span class="s1">&#39;resource_version&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">resource_version</span>
<span class="n">last_resource_version</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">watcher</span><span class="o">.</span><span class="n">stream</span><span class="p">(</span><span class="n">kube_client</span><span class="o">.</span><span class="n">list_namespaced_pod</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">namespace</span><span class="p">,</span>
<span class="o">**</span><span class="n">kwargs</span><span class="p">):</span>
<span class="n">task</span> <span class="o">=</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;object&#39;</span><span class="p">]</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Event: </span><span class="si">%s</span><span class="s1"> had an event of type </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">task</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">name</span><span class="p">,</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span>
<span class="p">)</span>
<span class="k">if</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;type&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="s1">&#39;ERROR&#39;</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">process_error</span><span class="p">(</span><span class="n">event</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">process_status</span><span class="p">(</span>
<span class="n">task</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">name</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">status</span><span class="o">.</span><span class="n">phase</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">labels</span><span class="p">,</span>
<span class="n">task</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">resource_version</span>
<span class="p">)</span>
<span class="n">last_resource_version</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">resource_version</span>
<span class="k">return</span> <span class="n">last_resource_version</span>
<span class="k">def</span> <span class="nf">process_error</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span>
<span class="s1">&#39;Encountered Error response from k8s list namespaced pod stream =&gt; </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">event</span>
<span class="p">)</span>
<span class="n">raw_object</span> <span class="o">=</span> <span class="n">event</span><span class="p">[</span><span class="s1">&#39;raw_object&#39;</span><span class="p">]</span>
<span class="k">if</span> <span class="n">raw_object</span><span class="p">[</span><span class="s1">&#39;code&#39;</span><span class="p">]</span> <span class="o">==</span> <span class="mi">410</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Kubernetes resource version is too old, must reset to 0 =&gt; </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">raw_object</span><span class="p">[</span><span class="s1">&#39;message&#39;</span><span class="p">]</span>
<span class="p">)</span>
<span class="c1"># Return resource version 0</span>
<span class="k">return</span> <span class="s1">&#39;0&#39;</span>
<span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span>
<span class="s1">&#39;Kubernetes failure for </span><span class="si">%s</span><span class="s1"> with code </span><span class="si">%s</span><span class="s1"> and message: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">raw_object</span><span class="p">[</span><span class="s1">&#39;reason&#39;</span><span class="p">],</span> <span class="n">raw_object</span><span class="p">[</span><span class="s1">&#39;code&#39;</span><span class="p">],</span> <span class="n">raw_object</span><span class="p">[</span><span class="s1">&#39;message&#39;</span><span class="p">]</span>
<span class="p">)</span>
<span class="k">def</span> <span class="nf">process_status</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">,</span> <span class="n">status</span><span class="p">,</span> <span class="n">labels</span><span class="p">,</span> <span class="n">resource_version</span><span class="p">):</span>
<span class="k">if</span> <span class="n">status</span> <span class="o">==</span> <span class="s1">&#39;Pending&#39;</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Event: </span><span class="si">%s</span><span class="s1"> Pending&#39;</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">)</span>
<span class="k">elif</span> <span class="n">status</span> <span class="o">==</span> <span class="s1">&#39;Failed&#39;</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Event: </span><span class="si">%s</span><span class="s1"> Failed&#39;</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span><span class="o">.</span><span class="n">put</span><span class="p">((</span><span class="n">pod_id</span><span class="p">,</span> <span class="n">State</span><span class="o">.</span><span class="n">FAILED</span><span class="p">,</span> <span class="n">labels</span><span class="p">,</span> <span class="n">resource_version</span><span class="p">))</span>
<span class="k">elif</span> <span class="n">status</span> <span class="o">==</span> <span class="s1">&#39;Succeeded&#39;</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Event: </span><span class="si">%s</span><span class="s1"> Succeeded&#39;</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span><span class="o">.</span><span class="n">put</span><span class="p">((</span><span class="n">pod_id</span><span class="p">,</span> <span class="kc">None</span><span class="p">,</span> <span class="n">labels</span><span class="p">,</span> <span class="n">resource_version</span><span class="p">))</span>
<span class="k">elif</span> <span class="n">status</span> <span class="o">==</span> <span class="s1">&#39;Running&#39;</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Event: </span><span class="si">%s</span><span class="s1"> is Running&#39;</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">)</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span>
<span class="s1">&#39;Event: Invalid state: </span><span class="si">%s</span><span class="s1"> on pod: </span><span class="si">%s</span><span class="s1"> with labels: </span><span class="si">%s</span><span class="s1"> with &#39;</span>
<span class="s1">&#39;resource_version: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">status</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">,</span> <span class="n">labels</span><span class="p">,</span> <span class="n">resource_version</span>
<span class="p">)</span>
<span class="k">class</span> <span class="nc">AirflowKubernetesScheduler</span><span class="p">(</span><span class="n">LoggingMixin</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">kube_config</span><span class="p">,</span> <span class="n">task_queue</span><span class="p">,</span> <span class="n">result_queue</span><span class="p">,</span> <span class="n">kube_client</span><span class="p">,</span> <span class="n">worker_uuid</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;Creating Kubernetes executor&quot;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span> <span class="o">=</span> <span class="n">kube_config</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span> <span class="o">=</span> <span class="n">task_queue</span>
<span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span> <span class="o">=</span> <span class="n">result_queue</span>
<span class="bp">self</span><span class="o">.</span><span class="n">namespace</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">kube_namespace</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;Kubernetes using namespace </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">namespace</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span> <span class="o">=</span> <span class="n">kube_client</span>
<span class="bp">self</span><span class="o">.</span><span class="n">launcher</span> <span class="o">=</span> <span class="n">PodLauncher</span><span class="p">(</span><span class="n">kube_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">worker_configuration</span> <span class="o">=</span> <span class="n">WorkerConfiguration</span><span class="p">(</span><span class="n">kube_config</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span> <span class="o">=</span> <span class="n">multiprocessing</span><span class="o">.</span><span class="n">Queue</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span> <span class="o">=</span> <span class="n">worker_uuid</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_watcher</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_make_kube_watcher</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">_make_kube_watcher</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">resource_version</span> <span class="o">=</span> <span class="n">KubeResourceVersion</span><span class="o">.</span><span class="n">get_current_resource_version</span><span class="p">()</span>
<span class="n">watcher</span> <span class="o">=</span> <span class="n">KubernetesJobWatcher</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">namespace</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span><span class="p">,</span>
<span class="n">resource_version</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span><span class="p">)</span>
<span class="n">watcher</span><span class="o">.</span><span class="n">start</span><span class="p">()</span>
<span class="k">return</span> <span class="n">watcher</span>
<span class="k">def</span> <span class="nf">_health_check_kube_watcher</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_watcher</span><span class="o">.</span><span class="n">is_alive</span><span class="p">():</span>
<span class="k">pass</span>
<span class="k">else</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span>
<span class="s1">&#39;Error while health checking kube watcher process. &#39;</span>
<span class="s1">&#39;Process died for unknown reasons&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_watcher</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_make_kube_watcher</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">run_next</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">next_job</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> The run_next command will check the task_queue for any un-run jobs.</span>
<span class="sd"> It will then create a unique job-id, launch that job in the cluster,</span>
<span class="sd"> and store relevant info in the current_jobs map so we can track the job&#39;s</span>
<span class="sd"> status</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Kubernetes job is </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">next_job</span><span class="p">))</span>
<span class="n">key</span><span class="p">,</span> <span class="n">command</span><span class="p">,</span> <span class="n">kube_executor_config</span> <span class="o">=</span> <span class="n">next_job</span>
<span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">,</span> <span class="n">try_number</span> <span class="o">=</span> <span class="n">key</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;Kubernetes running for command </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">command</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;Kubernetes launching image </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">kube_image</span><span class="p">)</span>
<span class="n">pod</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">worker_configuration</span><span class="o">.</span><span class="n">make_pod</span><span class="p">(</span>
<span class="n">namespace</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">namespace</span><span class="p">,</span> <span class="n">worker_uuid</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span><span class="p">,</span>
<span class="n">pod_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_create_pod_id</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="p">),</span>
<span class="n">dag_id</span><span class="o">=</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="o">=</span><span class="n">task_id</span><span class="p">,</span> <span class="n">try_number</span><span class="o">=</span><span class="n">try_number</span><span class="p">,</span>
<span class="n">execution_date</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_datetime_to_label_safe_datestring</span><span class="p">(</span><span class="n">execution_date</span><span class="p">),</span>
<span class="n">airflow_command</span><span class="o">=</span><span class="n">command</span><span class="p">,</span> <span class="n">kube_executor_config</span><span class="o">=</span><span class="n">kube_executor_config</span>
<span class="p">)</span>
<span class="c1"># the watcher will monitor pods, so we do not block.</span>
<span class="bp">self</span><span class="o">.</span><span class="n">launcher</span><span class="o">.</span><span class="n">run_pod_async</span><span class="p">(</span><span class="n">pod</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">&quot;Kubernetes Job created!&quot;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">delete_pod</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">delete_worker_pods</span><span class="p">:</span>
<span class="k">try</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span><span class="o">.</span><span class="n">delete_namespaced_pod</span><span class="p">(</span>
<span class="n">pod_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">namespace</span><span class="p">,</span> <span class="n">body</span><span class="o">=</span><span class="n">client</span><span class="o">.</span><span class="n">V1DeleteOptions</span><span class="p">())</span>
<span class="k">except</span> <span class="n">ApiException</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="c1"># If the pod is already deleted</span>
<span class="k">if</span> <span class="n">e</span><span class="o">.</span><span class="n">status</span> <span class="o">!=</span> <span class="mi">404</span><span class="p">:</span>
<span class="k">raise</span>
<span class="k">def</span> <span class="nf">sync</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> The sync function checks the status of all currently running kubernetes jobs.</span>
<span class="sd"> If a job is completed, it&#39;s status is placed in the result queue to</span>
<span class="sd"> be sent back to the scheduler.</span>
<span class="sd"> :return:</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_health_check_kube_watcher</span><span class="p">()</span>
<span class="k">while</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span><span class="o">.</span><span class="n">empty</span><span class="p">():</span>
<span class="bp">self</span><span class="o">.</span><span class="n">process_watcher_task</span><span class="p">()</span>
<span class="k">def</span> <span class="nf">process_watcher_task</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="n">pod_id</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">labels</span><span class="p">,</span> <span class="n">resource_version</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Attempting to finish pod; pod_id: </span><span class="si">%s</span><span class="s1">; state: </span><span class="si">%s</span><span class="s1">; labels: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">pod_id</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">labels</span>
<span class="p">)</span>
<span class="n">key</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_labels_to_key</span><span class="p">(</span><span class="n">labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span>
<span class="k">if</span> <span class="n">key</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;finishing job </span><span class="si">%s</span><span class="s1"> - </span><span class="si">%s</span><span class="s1"> (</span><span class="si">%s</span><span class="s1">)&#39;</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span><span class="o">.</span><span class="n">put</span><span class="p">((</span><span class="n">key</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">,</span> <span class="n">resource_version</span><span class="p">))</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">_strip_unsafe_kubernetes_special_chars</span><span class="p">(</span><span class="n">string</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Kubernetes only supports lowercase alphanumeric characters and &quot;-&quot; and &quot;.&quot; in</span>
<span class="sd"> the pod name</span>
<span class="sd"> However, there are special rules about how &quot;-&quot; and &quot;.&quot; can be used so let&#39;s</span>
<span class="sd"> only keep</span>
<span class="sd"> alphanumeric chars see here for detail:</span>
<span class="sd"> https://kubernetes.io/docs/concepts/overview/working-with-objects/names/</span>
<span class="sd"> :param string: The requested Pod name</span>
<span class="sd"> :return: ``str`` Pod name stripped of any unsafe characters</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="s1">&#39;&#39;</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">ch</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span> <span class="k">for</span> <span class="n">ind</span><span class="p">,</span> <span class="n">ch</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="n">string</span><span class="p">)</span> <span class="k">if</span> <span class="n">ch</span><span class="o">.</span><span class="n">isalnum</span><span class="p">())</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">_make_safe_pod_id</span><span class="p">(</span><span class="n">safe_dag_id</span><span class="p">,</span> <span class="n">safe_task_id</span><span class="p">,</span> <span class="n">safe_uuid</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Kubernetes pod names must be &lt;= 253 chars and must pass the following regex for</span>
<span class="sd"> validation</span>
<span class="sd"> &quot;^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$&quot;</span>
<span class="sd"> :param safe_dag_id: a dag_id with only alphanumeric characters</span>
<span class="sd"> :param safe_task_id: a task_id with only alphanumeric characters</span>
<span class="sd"> :param random_uuid: a uuid</span>
<span class="sd"> :return: ``str`` valid Pod name of appropriate length</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">MAX_POD_ID_LEN</span> <span class="o">=</span> <span class="mi">253</span>
<span class="n">safe_key</span> <span class="o">=</span> <span class="n">safe_dag_id</span> <span class="o">+</span> <span class="n">safe_task_id</span>
<span class="n">safe_pod_id</span> <span class="o">=</span> <span class="n">safe_key</span><span class="p">[:</span><span class="n">MAX_POD_ID_LEN</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="n">safe_uuid</span><span class="p">)</span> <span class="o">-</span> <span class="mi">1</span><span class="p">]</span> <span class="o">+</span> <span class="s2">&quot;-&quot;</span> <span class="o">+</span> <span class="n">safe_uuid</span>
<span class="k">return</span> <span class="n">safe_pod_id</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">_create_pod_id</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="p">):</span>
<span class="n">safe_dag_id</span> <span class="o">=</span> <span class="n">AirflowKubernetesScheduler</span><span class="o">.</span><span class="n">_strip_unsafe_kubernetes_special_chars</span><span class="p">(</span>
<span class="n">dag_id</span><span class="p">)</span>
<span class="n">safe_task_id</span> <span class="o">=</span> <span class="n">AirflowKubernetesScheduler</span><span class="o">.</span><span class="n">_strip_unsafe_kubernetes_special_chars</span><span class="p">(</span>
<span class="n">task_id</span><span class="p">)</span>
<span class="n">safe_uuid</span> <span class="o">=</span> <span class="n">AirflowKubernetesScheduler</span><span class="o">.</span><span class="n">_strip_unsafe_kubernetes_special_chars</span><span class="p">(</span>
<span class="n">uuid4</span><span class="p">()</span><span class="o">.</span><span class="n">hex</span><span class="p">)</span>
<span class="k">return</span> <span class="n">AirflowKubernetesScheduler</span><span class="o">.</span><span class="n">_make_safe_pod_id</span><span class="p">(</span><span class="n">safe_dag_id</span><span class="p">,</span> <span class="n">safe_task_id</span><span class="p">,</span>
<span class="n">safe_uuid</span><span class="p">)</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">_label_safe_datestring_to_datetime</span><span class="p">(</span><span class="n">string</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Kubernetes doesn&#39;t permit &quot;:&quot; in labels. ISO datetime format uses &quot;:&quot; but not</span>
<span class="sd"> &quot;_&quot;, let&#39;s</span>
<span class="sd"> replace &quot;:&quot; with &quot;_&quot;</span>
<span class="sd"> :param string: string</span>
<span class="sd"> :return: datetime.datetime object</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse</span><span class="p">(</span><span class="n">string</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;_plus_&#39;</span><span class="p">,</span> <span class="s1">&#39;+&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s2">&quot;_&quot;</span><span class="p">,</span> <span class="s2">&quot;:&quot;</span><span class="p">))</span>
<span class="nd">@staticmethod</span>
<span class="k">def</span> <span class="nf">_datetime_to_label_safe_datestring</span><span class="p">(</span><span class="n">datetime_obj</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> Kubernetes doesn&#39;t like &quot;:&quot; in labels, since ISO datetime format uses &quot;:&quot; but</span>
<span class="sd"> not &quot;_&quot; let&#39;s</span>
<span class="sd"> replace &quot;:&quot; with &quot;_&quot;</span>
<span class="sd"> :param datetime_obj: datetime.datetime object</span>
<span class="sd"> :return: ISO-like string representing the datetime</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="k">return</span> <span class="n">datetime_obj</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s2">&quot;:&quot;</span><span class="p">,</span> <span class="s2">&quot;_&quot;</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">&#39;+&#39;</span><span class="p">,</span> <span class="s1">&#39;_plus_&#39;</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">_labels_to_key</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">labels</span><span class="p">):</span>
<span class="n">try_num</span> <span class="o">=</span> <span class="mi">1</span>
<span class="k">try</span><span class="p">:</span>
<span class="n">try_num</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">labels</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;try_number&#39;</span><span class="p">,</span> <span class="s1">&#39;1&#39;</span><span class="p">))</span>
<span class="k">except</span> <span class="ne">ValueError</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span><span class="s2">&quot;could not get try_number as an int: </span><span class="si">%s</span><span class="s2">&quot;</span><span class="p">,</span> <span class="n">labels</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">&#39;try_number&#39;</span><span class="p">,</span> <span class="s1">&#39;1&#39;</span><span class="p">))</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">return</span> <span class="p">(</span>
<span class="n">labels</span><span class="p">[</span><span class="s1">&#39;dag_id&#39;</span><span class="p">],</span> <span class="n">labels</span><span class="p">[</span><span class="s1">&#39;task_id&#39;</span><span class="p">],</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_label_safe_datestring_to_datetime</span><span class="p">(</span><span class="n">labels</span><span class="p">[</span><span class="s1">&#39;execution_date&#39;</span><span class="p">]),</span>
<span class="n">try_num</span><span class="p">,</span>
<span class="p">)</span>
<span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span>
<span class="s1">&#39;Error while converting labels to key; labels: </span><span class="si">%s</span><span class="s1">; exception: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">labels</span><span class="p">,</span> <span class="n">e</span>
<span class="p">)</span>
<span class="k">return</span> <span class="kc">None</span>
<div class="viewcode-block" id="KubernetesExecutor"><a class="viewcode-back" href="../../../../code.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor">[docs]</a><span class="k">class</span> <span class="nc">KubernetesExecutor</span><span class="p">(</span><span class="n">BaseExecutor</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span> <span class="o">=</span> <span class="n">KubeConfig</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_scheduler</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span> <span class="o">=</span> <span class="kc">None</span>
<span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span> <span class="o">=</span> <span class="kc">None</span>
<span class="nb">super</span><span class="p">(</span><span class="n">KubernetesExecutor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">parallelism</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">parallelism</span><span class="p">)</span>
<div class="viewcode-block" id="KubernetesExecutor.clear_not_launched_queued_tasks"><a class="viewcode-back" href="../../../../code.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor.clear_not_launched_queued_tasks">[docs]</a> <span class="nd">@provide_session</span>
<span class="k">def</span> <span class="nf">clear_not_launched_queued_tasks</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="sd">&quot;&quot;&quot;</span>
<span class="sd"> If the airflow scheduler restarts with pending &quot;Queued&quot; tasks, the tasks may or</span>
<span class="sd"> may not</span>
<span class="sd"> have been launched Thus, on starting up the scheduler let&#39;s check every</span>
<span class="sd"> &quot;Queued&quot; task to</span>
<span class="sd"> see if it has been launched (ie: if there is a corresponding pod on kubernetes)</span>
<span class="sd"> If it has been launched then do nothing, otherwise reset the state to &quot;None&quot; so</span>
<span class="sd"> the task</span>
<span class="sd"> will be rescheduled</span>
<span class="sd"> This will not be necessary in a future version of airflow in which there is</span>
<span class="sd"> proper support</span>
<span class="sd"> for State.LAUNCHED</span>
<span class="sd"> &quot;&quot;&quot;</span>
<span class="n">queued_tasks</span> <span class="o">=</span> <span class="n">session</span>\
<span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TaskInstance</span><span class="p">)</span>\
<span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">QUEUED</span><span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;When executor started up, found </span><span class="si">%s</span><span class="s1"> queued task instances&#39;</span><span class="p">,</span>
<span class="nb">len</span><span class="p">(</span><span class="n">queued_tasks</span><span class="p">)</span>
<span class="p">)</span>
<span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">queued_tasks</span><span class="p">:</span>
<span class="n">dict_string</span> <span class="o">=</span> <span class="s2">&quot;dag_id=</span><span class="si">{}</span><span class="s2">,task_id=</span><span class="si">{}</span><span class="s2">,execution_date=</span><span class="si">{}</span><span class="s2">,airflow-worker=</span><span class="si">{}</span><span class="s2">&quot;</span> \
<span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span>
<span class="n">AirflowKubernetesScheduler</span><span class="o">.</span><span class="n">_datetime_to_label_safe_datestring</span><span class="p">(</span>
<span class="n">task</span><span class="o">.</span><span class="n">execution_date</span><span class="p">),</span> <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span><span class="p">)</span>
<span class="n">kwargs</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">label_selector</span><span class="o">=</span><span class="n">dict_string</span><span class="p">)</span>
<span class="n">pod_list</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span><span class="o">.</span><span class="n">list_namespaced_pod</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">kube_namespace</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span>
<span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">pod_list</span><span class="o">.</span><span class="n">items</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;TaskInstance: </span><span class="si">%s</span><span class="s1"> found in queued state but was not launched, &#39;</span>
<span class="s1">&#39;rescheduling&#39;</span><span class="p">,</span> <span class="n">task</span>
<span class="p">)</span>
<span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TaskInstance</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="n">task</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span>
<span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="n">task</span><span class="o">.</span><span class="n">execution_date</span>
<span class="p">)</span><span class="o">.</span><span class="n">update</span><span class="p">({</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">state</span><span class="p">:</span> <span class="n">State</span><span class="o">.</span><span class="n">NONE</span><span class="p">})</span></div>
<span class="k">def</span> <span class="nf">_inject_secrets</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">def</span> <span class="nf">_create_or_update_secret</span><span class="p">(</span><span class="n">secret_name</span><span class="p">,</span> <span class="n">secret_path</span><span class="p">):</span>
<span class="k">try</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span><span class="o">.</span><span class="n">create_namespaced_secret</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">executor_namespace</span><span class="p">,</span> <span class="n">kubernetes</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">V1Secret</span><span class="p">(</span>
<span class="n">data</span><span class="o">=</span><span class="p">{</span>
<span class="s1">&#39;key.json&#39;</span><span class="p">:</span> <span class="n">base64</span><span class="o">.</span><span class="n">b64encode</span><span class="p">(</span><span class="nb">open</span><span class="p">(</span><span class="n">secret_path</span><span class="p">,</span> <span class="s1">&#39;r&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">())},</span>
<span class="n">metadata</span><span class="o">=</span><span class="n">kubernetes</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">V1ObjectMeta</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="n">secret_name</span><span class="p">)))</span>
<span class="k">except</span> <span class="n">ApiException</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span>
<span class="k">if</span> <span class="n">e</span><span class="o">.</span><span class="n">status</span> <span class="o">==</span> <span class="mi">409</span><span class="p">:</span>
<span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span><span class="o">.</span><span class="n">replace_namespaced_secret</span><span class="p">(</span>
<span class="n">secret_name</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">executor_namespace</span><span class="p">,</span>
<span class="n">kubernetes</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">V1Secret</span><span class="p">(</span>
<span class="n">data</span><span class="o">=</span><span class="p">{</span><span class="s1">&#39;key.json&#39;</span><span class="p">:</span> <span class="n">base64</span><span class="o">.</span><span class="n">b64encode</span><span class="p">(</span>
<span class="nb">open</span><span class="p">(</span><span class="n">secret_path</span><span class="p">,</span> <span class="s1">&#39;r&#39;</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">())},</span>
<span class="n">metadata</span><span class="o">=</span><span class="n">kubernetes</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">V1ObjectMeta</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="n">secret_name</span><span class="p">)))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span>
<span class="s1">&#39;Exception while trying to inject secret. &#39;</span>
<span class="s1">&#39;Secret name: </span><span class="si">%s</span><span class="s1">, error details: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">secret_name</span><span class="p">,</span> <span class="n">e</span>
<span class="p">)</span>
<span class="k">raise</span>
<span class="c1"># For each GCP service account key, inject it as a secret in executor</span>
<span class="c1"># namespace with the specific secret name configured in the airflow.cfg.</span>
<span class="c1"># We let exceptions to pass through to users.</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">gcp_service_account_keys</span><span class="p">:</span>
<span class="n">name_path_pair_list</span> <span class="o">=</span> <span class="p">[</span>
<span class="p">{</span><span class="s1">&#39;name&#39;</span><span class="p">:</span> <span class="n">account_spec</span><span class="o">.</span><span class="n">strip</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;=&#39;</span><span class="p">)[</span><span class="mi">0</span><span class="p">],</span>
<span class="s1">&#39;path&#39;</span><span class="p">:</span> <span class="n">account_spec</span><span class="o">.</span><span class="n">strip</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;=&#39;</span><span class="p">)[</span><span class="mi">1</span><span class="p">]}</span>
<span class="k">for</span> <span class="n">account_spec</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">gcp_service_account_keys</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">&#39;,&#39;</span><span class="p">)]</span>
<span class="k">for</span> <span class="n">service_account</span> <span class="ow">in</span> <span class="n">name_path_pair_list</span><span class="p">:</span>
<span class="n">_create_or_update_secret</span><span class="p">(</span><span class="n">service_account</span><span class="p">[</span><span class="s1">&#39;name&#39;</span><span class="p">],</span> <span class="n">service_account</span><span class="p">[</span><span class="s1">&#39;path&#39;</span><span class="p">])</span>
<div class="viewcode-block" id="KubernetesExecutor.start"><a class="viewcode-back" href="../../../../code.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor.start">[docs]</a> <span class="k">def</span> <span class="nf">start</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Start Kubernetes executor&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span> <span class="o">=</span> <span class="n">KubeWorkerIdentifier</span><span class="o">.</span><span class="n">get_or_create_current_kube_worker_uuid</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;Start with worker_uuid: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span><span class="p">)</span>
<span class="c1"># always need to reset resource version since we don&#39;t know</span>
<span class="c1"># when we last started, note for behavior below</span>
<span class="c1"># https://github.com/kubernetes-client/python/blob/master/kubernetes/docs</span>
<span class="c1"># /CoreV1Api.md#list_namespaced_pod</span>
<span class="n">KubeResourceVersion</span><span class="o">.</span><span class="n">reset_resource_version</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span> <span class="o">=</span> <span class="n">Queue</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span> <span class="o">=</span> <span class="n">Queue</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span> <span class="o">=</span> <span class="n">get_kube_client</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_scheduler</span> <span class="o">=</span> <span class="n">AirflowKubernetesScheduler</span><span class="p">(</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span><span class="p">,</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span>
<span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_inject_secrets</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">clear_not_launched_queued_tasks</span><span class="p">()</span></div>
<div class="viewcode-block" id="KubernetesExecutor.execute_async"><a class="viewcode-back" href="../../../../code.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor.execute_async">[docs]</a> <span class="k">def</span> <span class="nf">execute_async</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">command</span><span class="p">,</span> <span class="n">queue</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">executor_config</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span>
<span class="s1">&#39;Add task </span><span class="si">%s</span><span class="s1"> with command </span><span class="si">%s</span><span class="s1"> with executor_config </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span>
<span class="n">key</span><span class="p">,</span> <span class="n">command</span><span class="p">,</span> <span class="n">executor_config</span>
<span class="p">)</span>
<span class="n">kube_executor_config</span> <span class="o">=</span> <span class="n">KubernetesExecutorConfig</span><span class="o">.</span><span class="n">from_dict</span><span class="p">(</span><span class="n">executor_config</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span><span class="o">.</span><span class="n">put</span><span class="p">((</span><span class="n">key</span><span class="p">,</span> <span class="n">command</span><span class="p">,</span> <span class="n">kube_executor_config</span><span class="p">))</span></div>
<div class="viewcode-block" id="KubernetesExecutor.sync"><a class="viewcode-back" href="../../../../code.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor.sync">[docs]</a> <span class="k">def</span> <span class="nf">sync</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">running</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;self.running: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">running</span><span class="p">)</span>
<span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">queued_tasks</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;self.queued: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">queued_tasks</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_scheduler</span><span class="o">.</span><span class="n">sync</span><span class="p">()</span>
<span class="n">last_resource_version</span> <span class="o">=</span> <span class="kc">None</span>
<span class="k">while</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span><span class="o">.</span><span class="n">empty</span><span class="p">():</span>
<span class="n">results</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="n">key</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">,</span> <span class="n">resource_version</span> <span class="o">=</span> <span class="n">results</span>
<span class="n">last_resource_version</span> <span class="o">=</span> <span class="n">resource_version</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Changing state of </span><span class="si">%s</span><span class="s1"> to </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="n">results</span><span class="p">,</span> <span class="n">state</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">_change_state</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">)</span>
<span class="n">KubeResourceVersion</span><span class="o">.</span><span class="n">checkpoint_resource_version</span><span class="p">(</span><span class="n">last_resource_version</span><span class="p">)</span>
<span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span><span class="o">.</span><span class="n">empty</span><span class="p">():</span>
<span class="n">key</span><span class="p">,</span> <span class="n">command</span><span class="p">,</span> <span class="n">kube_executor_config</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span><span class="o">.</span><span class="n">get</span><span class="p">()</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_scheduler</span><span class="o">.</span><span class="n">run_next</span><span class="p">((</span><span class="n">key</span><span class="p">,</span> <span class="n">command</span><span class="p">,</span> <span class="n">kube_executor_config</span><span class="p">))</span></div>
<span class="k">def</span> <span class="nf">_change_state</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">):</span>
<span class="k">if</span> <span class="n">state</span> <span class="o">!=</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">kube_scheduler</span><span class="o">.</span><span class="n">delete_pod</span><span class="p">(</span><span class="n">pod_id</span><span class="p">)</span>
<span class="k">try</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Deleted pod: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">key</span><span class="p">))</span>
<span class="bp">self</span><span class="o">.</span><span class="n">running</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="n">key</span><span class="p">)</span>
<span class="k">except</span> <span class="ne">KeyError</span><span class="p">:</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">&#39;Could not find key: </span><span class="si">%s</span><span class="s1">&#39;</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">key</span><span class="p">))</span>
<span class="k">pass</span>
<span class="bp">self</span><span class="o">.</span><span class="n">event_buffer</span><span class="p">[</span><span class="n">key</span><span class="p">]</span> <span class="o">=</span> <span class="n">state</span>
<span class="p">(</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="p">,</span> <span class="n">ex_time</span><span class="p">,</span> <span class="n">try_number</span><span class="p">)</span> <span class="o">=</span> <span class="n">key</span>
<span class="k">with</span> <span class="n">create_session</span><span class="p">()</span> <span class="k">as</span> <span class="n">session</span><span class="p">:</span>
<span class="n">item</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TaskInstance</span><span class="p">)</span><span class="o">.</span><span class="n">filter_by</span><span class="p">(</span>
<span class="n">dag_id</span><span class="o">=</span><span class="n">dag_id</span><span class="p">,</span>
<span class="n">task_id</span><span class="o">=</span><span class="n">task_id</span><span class="p">,</span>
<span class="n">execution_date</span><span class="o">=</span><span class="n">ex_time</span>
<span class="p">)</span><span class="o">.</span><span class="n">one</span><span class="p">()</span>
<span class="k">if</span> <span class="n">state</span><span class="p">:</span>
<span class="n">item</span><span class="o">.</span><span class="n">state</span> <span class="o">=</span> <span class="n">state</span>
<span class="n">session</span><span class="o">.</span><span class="n">add</span><span class="p">(</span><span class="n">item</span><span class="p">)</span>
<div class="viewcode-block" id="KubernetesExecutor.end"><a class="viewcode-back" href="../../../../code.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor.end">[docs]</a> <span class="k">def</span> <span class="nf">end</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span>
<span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">&#39;Shutting down Kubernetes executor&#39;</span><span class="p">)</span>
<span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span><span class="o">.</span><span class="n">join</span><span class="p">()</span></div></div>
</pre></div>
</div>
</div>
<footer>
<hr/>
<div role="contentinfo">
<p>
</p>
</div>
Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>.
</footer>
</div>
</div>
</section>
</div>
<script type="text/javascript" id="documentation_options" data-url_root="../../../../" src="../../../../_static/documentation_options.js"></script>
<script type="text/javascript" src="../../../../_static/jquery.js"></script>
<script type="text/javascript" src="../../../../_static/underscore.js"></script>
<script type="text/javascript" src="../../../../_static/doctools.js"></script>
<script type="text/javascript" src="../../../../_static/language_data.js"></script>
<script type="text/javascript" src="../../../../_static/js/theme.js"></script>
<script type="text/javascript">
jQuery(function () {
SphinxRtdTheme.Navigation.enable(true);
});
</script>
</body>
</html>