| |
| |
| |
| |
| <!DOCTYPE html> |
| <!--[if IE 8]><html class="no-js lt-ie9" lang="en" > <![endif]--> |
| <!--[if gt IE 8]><!--> <html class="no-js" lang="en" > <!--<![endif]--> |
| <head> |
| <meta charset="utf-8"> |
| |
| <meta name="viewport" content="width=device-width, initial-scale=1.0"> |
| |
| <title>airflow.contrib.executors.kubernetes_executor — Airflow Documentation</title> |
| |
| |
| |
| |
| |
| |
| |
| |
| <script type="text/javascript" src="../../../../_static/js/modernizr.min.js"></script> |
| |
| |
| <script type="text/javascript" id="documentation_options" data-url_root="../../../../" src="../../../../_static/documentation_options.js"></script> |
| <script type="text/javascript" src="../../../../_static/jquery.js"></script> |
| <script type="text/javascript" src="../../../../_static/underscore.js"></script> |
| <script type="text/javascript" src="../../../../_static/doctools.js"></script> |
| <script type="text/javascript" src="../../../../_static/language_data.js"></script> |
| |
| <script type="text/javascript" src="../../../../_static/js/theme.js"></script> |
| |
| |
| |
| |
| <link rel="stylesheet" href="../../../../_static/css/theme.css" type="text/css" /> |
| <link rel="stylesheet" href="../../../../_static/pygments.css" type="text/css" /> |
| <link rel="index" title="Index" href="../../../../genindex.html" /> |
| <link rel="search" title="Search" href="../../../../search.html" /> |
| |
| <script> |
| document.addEventListener('DOMContentLoaded', function() { |
| var el = document.getElementById('changelog'); |
| if (el !== null ) { |
| // [AIRFLOW-...] |
| el.innerHTML = el.innerHTML.replace( |
| /\[(AIRFLOW-[\d]+)\]/g, |
| `<a href="https://issues.apache.org/jira/browse/$1">[$1]</a>` |
| ); |
| // (#...) |
| el.innerHTML = el.innerHTML.replace( |
| /\(#([\d]+)\)/g, |
| `<a href="https://github.com/apache/airflow/pull/$1">(#$1)</a>` |
| ); |
| }; |
| }) |
| </script> |
| <style> |
| .example-header { |
| position: relative; |
| background: #9AAA7A; |
| padding: 8px 16px; |
| margin-bottom: 0; |
| } |
| .example-header--with-button { |
| padding-right: 166px; |
| } |
| .example-header:after{ |
| content: ''; |
| display: table; |
| clear: both; |
| } |
| .example-title { |
| display:block; |
| padding: 4px; |
| margin-right: 16px; |
| color: white; |
| overflow-x: auto; |
| } |
| .example-header-button { |
| top: 8px; |
| right: 16px; |
| position: absolute; |
| } |
| .example-header + .highlight-python { |
| margin-top: 0 !important; |
| } |
| .viewcode-button { |
| display: inline-block; |
| padding: 8px 16px; |
| border: 0; |
| margin: 0; |
| outline: 0; |
| border-radius: 2px; |
| -webkit-box-shadow: 0 3px 5px 0 rgba(0,0,0,.3); |
| box-shadow: 0 3px 6px 0 rgba(0,0,0,.3); |
| color: #404040; |
| background-color: #e7e7e7; |
| cursor: pointer; |
| font-size: 16px; |
| font-weight: 500; |
| line-height: 1; |
| text-decoration: none; |
| text-overflow: ellipsis; |
| overflow: hidden; |
| text-transform: uppercase; |
| -webkit-transition: background-color .2s; |
| transition: background-color .2s; |
| vertical-align: middle; |
| white-space: nowrap; |
| } |
| .viewcode-button:visited { |
| color: #404040; |
| } |
| .viewcode-button:hover, .viewcode-button:focus { |
| color: #404040; |
| background-color: #d6d6d6; |
| } |
| </style> |
| |
| </head> |
| |
| <body class="wy-body-for-nav"> |
| |
| |
| <div class="wy-grid-for-nav"> |
| |
| <nav data-toggle="wy-nav-shift" class="wy-nav-side"> |
| <div class="wy-side-scroll"> |
| <div class="wy-side-nav-search" > |
| |
| |
| |
| <a href="../../../../index.html" class="icon icon-home"> Airflow |
| |
| |
| |
| </a> |
| |
| |
| |
| |
| <div class="version"> |
| 1.10.4 |
| </div> |
| |
| |
| |
| |
| <div role="search"> |
| <form id="rtd-search-form" class="wy-form" action="../../../../search.html" method="get"> |
| <input type="text" name="q" placeholder="Search docs" /> |
| <input type="hidden" name="check_keywords" value="yes" /> |
| <input type="hidden" name="area" value="default" /> |
| </form> |
| </div> |
| |
| |
| </div> |
| |
| <div class="wy-menu wy-menu-vertical" data-spy="affix" role="navigation" aria-label="main navigation"> |
| |
| |
| |
| |
| |
| |
| <ul> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../project.html">Project</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../license.html">License</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../start.html">Quick Start</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../installation.html">Installation</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../tutorial.html">Tutorial</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../howto/index.html">How-to Guides</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../ui.html">UI / Screenshots</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../concepts.html">Concepts</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../profiling.html">Data Profiling</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../cli.html">Command Line Interface</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../scheduler.html">Scheduling & Triggers</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../plugins.html">Plugins</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../security.html">Security</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../timezone.html">Time zones</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../api.html">Experimental Rest API</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../integration.html">Integration</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../metrics.html">Metrics</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../kubernetes.html">Kubernetes</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../lineage.html">Lineage</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../changelog.html">Changelog</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../faq.html">FAQ</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../macros.html">Macros reference</a></li> |
| <li class="toctree-l1"><a class="reference internal" href="../../../../_api/index.html">API Reference</a></li> |
| </ul> |
| |
| |
| |
| </div> |
| </div> |
| </nav> |
| |
| <section data-toggle="wy-nav-shift" class="wy-nav-content-wrap"> |
| |
| |
| <nav class="wy-nav-top" aria-label="top navigation"> |
| |
| <i data-toggle="wy-nav-top" class="fa fa-bars"></i> |
| <a href="../../../../index.html">Airflow</a> |
| |
| </nav> |
| |
| |
| <div class="wy-nav-content"> |
| |
| <div class="rst-content"> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| <div role="navigation" aria-label="breadcrumbs navigation"> |
| |
| <ul class="wy-breadcrumbs"> |
| |
| <li><a href="../../../../index.html">Docs</a> »</li> |
| |
| <li><a href="../../../index.html">Module code</a> »</li> |
| |
| <li>airflow.contrib.executors.kubernetes_executor</li> |
| |
| |
| <li class="wy-breadcrumbs-aside"> |
| |
| </li> |
| |
| </ul> |
| |
| |
| <hr/> |
| </div> |
| <div role="main" class="document" itemscope="itemscope" itemtype="http://schema.org/Article"> |
| <div itemprop="articleBody"> |
| |
| <h1>Source code for airflow.contrib.executors.kubernetes_executor</h1><div class="highlight"><pre> |
| <span></span><span class="c1"># Licensed to the Apache Software Foundation (ASF) under one</span> |
| <span class="c1"># or more contributor license agreements. See the NOTICE file</span> |
| <span class="c1"># distributed with this work for additional information</span> |
| <span class="c1"># regarding copyright ownership. The ASF licenses this file</span> |
| <span class="c1"># to you under the Apache License, Version 2.0 (the</span> |
| <span class="c1"># "License"); you may not use this file except in compliance</span> |
| <span class="c1"># with the License. You may obtain a copy of the License at</span> |
| <span class="c1">#</span> |
| <span class="c1"># http://www.apache.org/licenses/LICENSE-2.0</span> |
| <span class="c1">#</span> |
| <span class="c1"># Unless required by applicable law or agreed to in writing,</span> |
| <span class="c1"># software distributed under the License is distributed on an</span> |
| <span class="c1"># "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY</span> |
| <span class="c1"># KIND, either express or implied. See the License for the</span> |
| <span class="c1"># specific language governing permissions and limitations</span> |
| <span class="c1"># under the License.</span> |
| |
| <span class="kn">import</span> <span class="nn">base64</span> |
| <span class="kn">import</span> <span class="nn">hashlib</span> |
| <span class="kn">from</span> <span class="nn">queue</span> <span class="k">import</span> <span class="n">Empty</span> |
| |
| <span class="kn">import</span> <span class="nn">re</span> |
| <span class="kn">import</span> <span class="nn">json</span> |
| <span class="kn">import</span> <span class="nn">multiprocessing</span> |
| <span class="kn">from</span> <span class="nn">dateutil</span> <span class="k">import</span> <span class="n">parser</span> |
| <span class="kn">from</span> <span class="nn">uuid</span> <span class="k">import</span> <span class="n">uuid4</span> |
| <span class="kn">import</span> <span class="nn">kubernetes</span> |
| <span class="kn">from</span> <span class="nn">kubernetes</span> <span class="k">import</span> <span class="n">watch</span><span class="p">,</span> <span class="n">client</span> |
| <span class="kn">from</span> <span class="nn">kubernetes.client.rest</span> <span class="k">import</span> <span class="n">ApiException</span> |
| <span class="kn">from</span> <span class="nn">airflow.configuration</span> <span class="k">import</span> <span class="n">conf</span> |
| <span class="kn">from</span> <span class="nn">airflow.contrib.kubernetes.pod_launcher</span> <span class="k">import</span> <span class="n">PodLauncher</span> |
| <span class="kn">from</span> <span class="nn">airflow.contrib.kubernetes.kube_client</span> <span class="k">import</span> <span class="n">get_kube_client</span> |
| <span class="kn">from</span> <span class="nn">airflow.contrib.kubernetes.worker_configuration</span> <span class="k">import</span> <span class="n">WorkerConfiguration</span> |
| <span class="kn">from</span> <span class="nn">airflow.executors.base_executor</span> <span class="k">import</span> <span class="n">BaseExecutor</span> |
| <span class="kn">from</span> <span class="nn">airflow.executors</span> <span class="k">import</span> <span class="n">Executors</span> |
| <span class="kn">from</span> <span class="nn">airflow.models</span> <span class="k">import</span> <span class="n">KubeResourceVersion</span><span class="p">,</span> <span class="n">KubeWorkerIdentifier</span><span class="p">,</span> <span class="n">TaskInstance</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.state</span> <span class="k">import</span> <span class="n">State</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.db</span> <span class="k">import</span> <span class="n">provide_session</span><span class="p">,</span> <span class="n">create_session</span> |
| <span class="kn">from</span> <span class="nn">airflow</span> <span class="k">import</span> <span class="n">configuration</span><span class="p">,</span> <span class="n">settings</span> |
| <span class="kn">from</span> <span class="nn">airflow.exceptions</span> <span class="k">import</span> <span class="n">AirflowConfigException</span><span class="p">,</span> <span class="n">AirflowException</span> |
| <span class="kn">from</span> <span class="nn">airflow.utils.log.logging_mixin</span> <span class="k">import</span> <span class="n">LoggingMixin</span> |
| |
| |
| <div class="viewcode-block" id="KubernetesExecutorConfig"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutorConfig">[docs]</a><span class="k">class</span> <span class="nc">KubernetesExecutorConfig</span><span class="p">:</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">image</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">image_pull_policy</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">request_memory</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">request_cpu</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">limit_memory</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">limit_cpu</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">limit_gpu</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">gcp_service_account_key</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">node_selectors</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">affinity</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> |
| <span class="n">annotations</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">volumes</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">volume_mounts</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">tolerations</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">labels</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">image</span> <span class="o">=</span> <span class="n">image</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">image_pull_policy</span> <span class="o">=</span> <span class="n">image_pull_policy</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">request_memory</span> <span class="o">=</span> <span class="n">request_memory</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">request_cpu</span> <span class="o">=</span> <span class="n">request_cpu</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">limit_memory</span> <span class="o">=</span> <span class="n">limit_memory</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">limit_cpu</span> <span class="o">=</span> <span class="n">limit_cpu</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">limit_gpu</span> <span class="o">=</span> <span class="n">limit_gpu</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">gcp_service_account_key</span> <span class="o">=</span> <span class="n">gcp_service_account_key</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">node_selectors</span> <span class="o">=</span> <span class="n">node_selectors</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">affinity</span> <span class="o">=</span> <span class="n">affinity</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">annotations</span> <span class="o">=</span> <span class="n">annotations</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">volumes</span> <span class="o">=</span> <span class="n">volumes</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">volume_mounts</span> <span class="o">=</span> <span class="n">volume_mounts</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">tolerations</span> <span class="o">=</span> <span class="n">tolerations</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">labels</span> <span class="o">=</span> <span class="n">labels</span> <span class="ow">or</span> <span class="p">{}</span> |
| |
| <div class="viewcode-block" id="KubernetesExecutorConfig.__repr__"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutorConfig.__repr__">[docs]</a> <span class="k">def</span> <span class="nf">__repr__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="s2">"</span><span class="si">{}</span><span class="s2">(image=</span><span class="si">{}</span><span class="s2">, image_pull_policy=</span><span class="si">{}</span><span class="s2">, request_memory=</span><span class="si">{}</span><span class="s2">, request_cpu=</span><span class="si">{}</span><span class="s2">, "</span> \ |
| <span class="s2">"limit_memory=</span><span class="si">{}</span><span class="s2">, limit_cpu=</span><span class="si">{}</span><span class="s2">, limit_gpu=</span><span class="si">{}</span><span class="s2">, gcp_service_account_key=</span><span class="si">{}</span><span class="s2">, "</span> \ |
| <span class="s2">"node_selectors=</span><span class="si">{}</span><span class="s2">, affinity=</span><span class="si">{}</span><span class="s2">, annotations=</span><span class="si">{}</span><span class="s2">, volumes=</span><span class="si">{}</span><span class="s2">, "</span> \ |
| <span class="s2">"volume_mounts=</span><span class="si">{}</span><span class="s2">, tolerations=</span><span class="si">{}</span><span class="s2">, labels=</span><span class="si">{}</span><span class="s2">)"</span> \ |
| <span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">KubernetesExecutorConfig</span><span class="o">.</span><span class="vm">__name__</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">image</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">image_pull_policy</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">request_memory</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_cpu</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">limit_memory</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">limit_cpu</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">limit_gpu</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">gcp_service_account_key</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">node_selectors</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">affinity</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">annotations</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">volumes</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">volume_mounts</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">tolerations</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">labels</span><span class="p">)</span></div> |
| |
| <span class="nd">@staticmethod</span> |
| <div class="viewcode-block" id="KubernetesExecutorConfig.from_dict"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutorConfig.from_dict">[docs]</a> <span class="k">def</span> <span class="nf">from_dict</span><span class="p">(</span><span class="n">obj</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">obj</span> <span class="ow">is</span> <span class="kc">None</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">KubernetesExecutorConfig</span><span class="p">()</span> |
| |
| <span class="k">if</span> <span class="ow">not</span> <span class="nb">isinstance</span><span class="p">(</span><span class="n">obj</span><span class="p">,</span> <span class="nb">dict</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="ne">TypeError</span><span class="p">(</span> |
| <span class="s1">'Cannot convert a non-dictionary object into a KubernetesExecutorConfig'</span><span class="p">)</span> |
| |
| <span class="n">namespaced</span> <span class="o">=</span> <span class="n">obj</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="n">Executors</span><span class="o">.</span><span class="n">KubernetesExecutor</span><span class="p">,</span> <span class="p">{})</span> |
| |
| <span class="k">return</span> <span class="n">KubernetesExecutorConfig</span><span class="p">(</span> |
| <span class="n">image</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'image'</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> |
| <span class="n">image_pull_policy</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'image_pull_policy'</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> |
| <span class="n">request_memory</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'request_memory'</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> |
| <span class="n">request_cpu</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'request_cpu'</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> |
| <span class="n">limit_memory</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'limit_memory'</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> |
| <span class="n">limit_cpu</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'limit_cpu'</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> |
| <span class="n">limit_gpu</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'limit_gpu'</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> |
| <span class="n">gcp_service_account_key</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'gcp_service_account_key'</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> |
| <span class="n">node_selectors</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'node_selectors'</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> |
| <span class="n">affinity</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'affinity'</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> |
| <span class="n">annotations</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'annotations'</span><span class="p">,</span> <span class="p">{}),</span> |
| <span class="n">volumes</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'volumes'</span><span class="p">,</span> <span class="p">[]),</span> |
| <span class="n">volume_mounts</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'volume_mounts'</span><span class="p">,</span> <span class="p">[]),</span> |
| <span class="n">tolerations</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'tolerations'</span><span class="p">,</span> <span class="kc">None</span><span class="p">),</span> |
| <span class="n">labels</span><span class="o">=</span><span class="n">namespaced</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'labels'</span><span class="p">,</span> <span class="p">{}),</span></div> |
| <span class="p">)</span> |
| |
| <div class="viewcode-block" id="KubernetesExecutorConfig.as_dict"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutorConfig.as_dict">[docs]</a> <span class="k">def</span> <span class="nf">as_dict</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">return</span> <span class="p">{</span> |
| <span class="s1">'image'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">image</span><span class="p">,</span> |
| <span class="s1">'image_pull_policy'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">image_pull_policy</span><span class="p">,</span> |
| <span class="s1">'request_memory'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_memory</span><span class="p">,</span> |
| <span class="s1">'request_cpu'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">request_cpu</span><span class="p">,</span> |
| <span class="s1">'limit_memory'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">limit_memory</span><span class="p">,</span> |
| <span class="s1">'limit_cpu'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">limit_cpu</span><span class="p">,</span> |
| <span class="s1">'limit_gpu'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">limit_gpu</span><span class="p">,</span> |
| <span class="s1">'gcp_service_account_key'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">gcp_service_account_key</span><span class="p">,</span> |
| <span class="s1">'node_selectors'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">node_selectors</span><span class="p">,</span> |
| <span class="s1">'affinity'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">affinity</span><span class="p">,</span> |
| <span class="s1">'annotations'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">annotations</span><span class="p">,</span> |
| <span class="s1">'volumes'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">volumes</span><span class="p">,</span> |
| <span class="s1">'volume_mounts'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">volume_mounts</span><span class="p">,</span> |
| <span class="s1">'tolerations'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">tolerations</span><span class="p">,</span> |
| <span class="s1">'labels'</span><span class="p">:</span> <span class="bp">self</span><span class="o">.</span><span class="n">labels</span><span class="p">,</span></div></div> |
| <span class="p">}</span> |
| |
| |
| <div class="viewcode-block" id="KubeConfig"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubeConfig">[docs]</a><span class="k">class</span> <span class="nc">KubeConfig</span><span class="p">:</span> |
| <div class="viewcode-block" id="KubeConfig.core_section"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubeConfig.core_section">[docs]</a> <span class="n">core_section</span> <span class="o">=</span> <span class="s1">'core'</span></div> |
| <div class="viewcode-block" id="KubeConfig.kubernetes_section"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubeConfig.kubernetes_section">[docs]</a> <span class="n">kubernetes_section</span> <span class="o">=</span> <span class="s1">'kubernetes'</span></div> |
| |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">configuration_dict</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">as_dict</span><span class="p">(</span><span class="n">display_sensitive</span><span class="o">=</span><span class="kc">True</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">core_configuration</span> <span class="o">=</span> <span class="n">configuration_dict</span><span class="p">[</span><span class="s1">'core'</span><span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_secrets</span> <span class="o">=</span> <span class="n">configuration_dict</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'kubernetes_secrets'</span><span class="p">,</span> <span class="p">{})</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_env_vars</span> <span class="o">=</span> <span class="n">configuration_dict</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'kubernetes_environment_variables'</span><span class="p">,</span> <span class="p">{})</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">env_from_configmap_ref</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> |
| <span class="s1">'env_from_configmap_ref'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">env_from_secret_ref</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> |
| <span class="s1">'env_from_secret_ref'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">airflow_home</span> <span class="o">=</span> <span class="n">settings</span><span class="o">.</span><span class="n">AIRFLOW_HOME</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dags_folder</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">core_section</span><span class="p">,</span> <span class="s1">'dags_folder'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">parallelism</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">getint</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">core_section</span><span class="p">,</span> <span class="s1">'PARALLELISM'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_container_repository</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'worker_container_repository'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_container_tag</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'worker_container_tag'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_image</span> <span class="o">=</span> <span class="s1">'</span><span class="si">{}</span><span class="s1">:</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_container_repository</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">worker_container_tag</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_image_pull_policy</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s2">"worker_container_image_pull_policy"</span> |
| <span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_node_selectors</span> <span class="o">=</span> <span class="n">configuration_dict</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'kubernetes_node_selectors'</span><span class="p">,</span> <span class="p">{})</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_annotations</span> <span class="o">=</span> <span class="n">configuration_dict</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'kubernetes_annotations'</span><span class="p">,</span> <span class="p">{})</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_labels</span> <span class="o">=</span> <span class="n">configuration_dict</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'kubernetes_labels'</span><span class="p">,</span> <span class="p">{})</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">delete_worker_pods</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'delete_worker_pods'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_pods_creation_batch_size</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">getint</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'worker_pods_creation_batch_size'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_service_account_name</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'worker_service_account_name'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">image_pull_secrets</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'image_pull_secrets'</span><span class="p">)</span> |
| |
| <span class="c1"># NOTE: user can build the dags into the docker image directly,</span> |
| <span class="c1"># this will set to True if so</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dags_in_image</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">getboolean</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'dags_in_image'</span><span class="p">)</span> |
| |
| <span class="c1"># Run as user for pod security context</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_run_as_user</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_security_context_val</span><span class="p">(</span><span class="s1">'run_as_user'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_fs_group</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_get_security_context_val</span><span class="p">(</span><span class="s1">'fs_group'</span><span class="p">)</span> |
| |
| <span class="c1"># NOTE: `git_repo` and `git_branch` must be specified together as a pair</span> |
| <span class="c1"># The http URL of the git repository to clone from</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_repo</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'git_repo'</span><span class="p">)</span> |
| <span class="c1"># The branch of the repository to be checked out</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_branch</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'git_branch'</span><span class="p">)</span> |
| <span class="c1"># Optionally, the directory in the git repository containing the dags</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_subpath</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'git_subpath'</span><span class="p">)</span> |
| <span class="c1"># Optionally, the root directory for git operations</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_sync_root</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'git_sync_root'</span><span class="p">)</span> |
| <span class="c1"># Optionally, the name at which to publish the checked-out files under --root</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_sync_dest</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'git_sync_dest'</span><span class="p">)</span> |
| <span class="c1"># Optionally, if git_dags_folder_mount_point is set the worker will use</span> |
| <span class="c1"># {git_dags_folder_mount_point}/{git_sync_dest}/{git_subpath} as dags_folder</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_dags_folder_mount_point</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> |
| <span class="s1">'git_dags_folder_mount_point'</span><span class="p">)</span> |
| |
| <span class="c1"># Optionally a user may supply a (`git_user` AND `git_password`) OR</span> |
| <span class="c1"># (`git_ssh_key_secret_name` AND `git_ssh_key_secret_key`) for private repositories</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_user</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'git_user'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_password</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'git_password'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_ssh_key_secret_name</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'git_ssh_key_secret_name'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_ssh_known_hosts_configmap_name</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> |
| <span class="s1">'git_ssh_known_hosts_configmap_name'</span><span class="p">)</span> |
| |
| <span class="c1"># NOTE: The user may optionally use a volume claim to mount a PV containing</span> |
| <span class="c1"># DAGs directly</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dags_volume_claim</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'dags_volume_claim'</span><span class="p">)</span> |
| |
| <span class="c1"># This prop may optionally be set for PV Claims and is used to write logs</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">logs_volume_claim</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'logs_volume_claim'</span><span class="p">)</span> |
| |
| <span class="c1"># This prop may optionally be set for PV Claims and is used to locate DAGs</span> |
| <span class="c1"># on a SubPath</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dags_volume_subpath</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'dags_volume_subpath'</span><span class="p">)</span> |
| |
| <span class="c1"># This prop may optionally be set for PV Claims and is used to locate logs</span> |
| <span class="c1"># on a SubPath</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">logs_volume_subpath</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'logs_volume_subpath'</span><span class="p">)</span> |
| |
| <span class="c1"># Optionally, hostPath volume containing DAGs</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">dags_volume_host</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'dags_volume_host'</span><span class="p">)</span> |
| |
| <span class="c1"># Optionally, write logs to a hostPath Volume</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">logs_volume_host</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'logs_volume_host'</span><span class="p">)</span> |
| |
| <span class="c1"># This prop may optionally be set for PV Claims and is used to write logs</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">base_log_folder</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">core_section</span><span class="p">,</span> <span class="s1">'base_log_folder'</span><span class="p">)</span> |
| |
| <span class="c1"># The Kubernetes Namespace in which the Scheduler and Webserver reside. Note</span> |
| <span class="c1"># that if your</span> |
| <span class="c1"># cluster has RBAC enabled, your scheduler may need service account permissions to</span> |
| <span class="c1"># create, watch, get, and delete pods in this namespace.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_namespace</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'namespace'</span><span class="p">)</span> |
| <span class="c1"># The Kubernetes Namespace in which pods will be created by the executor. Note</span> |
| <span class="c1"># that if your</span> |
| <span class="c1"># cluster has RBAC enabled, your workers may need service account permissions to</span> |
| <span class="c1"># interact with cluster components.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">executor_namespace</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'namespace'</span><span class="p">)</span> |
| <span class="c1"># Task secrets managed by KubernetesExecutor.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">gcp_service_account_keys</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> |
| <span class="s1">'gcp_service_account_keys'</span><span class="p">)</span> |
| |
| <span class="c1"># If the user is using the git-sync container to clone their repository via git,</span> |
| <span class="c1"># allow them to specify repository, tag, and pod name for the init container.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_sync_container_repository</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'git_sync_container_repository'</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_sync_container_tag</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'git_sync_container_tag'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_sync_container</span> <span class="o">=</span> <span class="s1">'</span><span class="si">{}</span><span class="s1">:</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_sync_container_repository</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">git_sync_container_tag</span><span class="p">)</span> |
| |
| <span class="bp">self</span><span class="o">.</span><span class="n">git_sync_init_container_name</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'git_sync_init_container_name'</span><span class="p">)</span> |
| |
| <span class="c1"># The worker pod may optionally have a valid Airflow config loaded via a</span> |
| <span class="c1"># configmap</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">airflow_configmap</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'airflow_configmap'</span><span class="p">)</span> |
| |
| <span class="n">affinity_json</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'affinity'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">affinity_json</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_affinity</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">affinity_json</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_affinity</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="n">tolerations_json</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'tolerations'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">tolerations_json</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_tolerations</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">tolerations_json</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_tolerations</span> <span class="o">=</span> <span class="kc">None</span> |
| |
| <span class="n">kube_client_request_args</span> <span class="o">=</span> <span class="n">conf</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="s1">'kube_client_request_args'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">kube_client_request_args</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_client_request_args</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">kube_client_request_args</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_client_request_args</span><span class="p">[</span><span class="s1">'_request_timeout'</span><span class="p">]</span> <span class="ow">and</span> \ |
| <span class="nb">isinstance</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_client_request_args</span><span class="p">[</span><span class="s1">'_request_timeout'</span><span class="p">],</span> <span class="nb">list</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_client_request_args</span><span class="p">[</span><span class="s1">'_request_timeout'</span><span class="p">]</span> <span class="o">=</span> \ |
| <span class="nb">tuple</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_client_request_args</span><span class="p">[</span><span class="s1">'_request_timeout'</span><span class="p">])</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_client_request_args</span> <span class="o">=</span> <span class="p">{}</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_validate</span><span class="p">()</span> |
| |
| <span class="c1"># pod security context items should return integers</span> |
| <span class="c1"># and only return a blank string if contexts are not set.</span> |
| <div class="viewcode-block" id="KubeConfig._get_security_context_val"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubeConfig._get_security_context_val">[docs]</a> <span class="k">def</span> <span class="nf">_get_security_context_val</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">scontext</span><span class="p">):</span> |
| <span class="n">val</span> <span class="o">=</span> <span class="n">configuration</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kubernetes_section</span><span class="p">,</span> <span class="n">scontext</span><span class="p">)</span> |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">val</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="k">return</span> <span class="n">val</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="k">return</span> <span class="nb">int</span><span class="p">(</span><span class="n">val</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="KubeConfig._validate"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubeConfig._validate">[docs]</a> <span class="k">def</span> <span class="nf">_validate</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="c1"># TODO: use XOR for dags_volume_claim and git_dags_folder_mount_point</span> |
| <span class="k">if</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags_volume_claim</span> \ |
| <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags_volume_host</span> \ |
| <span class="ow">and</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">dags_in_image</span> \ |
| <span class="ow">and</span> <span class="p">(</span><span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">git_repo</span> <span class="ow">or</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">git_branch</span> <span class="ow">or</span> <span class="ow">not</span> <span class="bp">self</span><span class="o">.</span><span class="n">git_dags_folder_mount_point</span><span class="p">):</span> |
| <span class="k">raise</span> <span class="n">AirflowConfigException</span><span class="p">(</span> |
| <span class="s1">'In kubernetes mode the following must be set in the `kubernetes` '</span> |
| <span class="s1">'config section: `dags_volume_claim` '</span> |
| <span class="s1">'or `dags_volume_host` '</span> |
| <span class="s1">'or `dags_in_image` '</span> |
| <span class="s1">'or `git_repo and git_branch and git_dags_folder_mount_point`'</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">git_repo</span> \ |
| <span class="ow">and</span> <span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">git_user</span> <span class="ow">or</span> <span class="bp">self</span><span class="o">.</span><span class="n">git_password</span><span class="p">)</span> \ |
| <span class="ow">and</span> <span class="bp">self</span><span class="o">.</span><span class="n">git_ssh_key_secret_name</span><span class="p">:</span> |
| <span class="k">raise</span> <span class="n">AirflowConfigException</span><span class="p">(</span> |
| <span class="s1">'In kubernetes mode, using `git_repo` to pull the DAGs: '</span></div></div> |
| <span class="s1">'for private repositories, either `git_user` and `git_password` '</span> |
| <span class="s1">'must be set for authentication through user credentials; '</span> |
| <span class="s1">'or `git_ssh_key_secret_name` must be set for authentication '</span> |
| <span class="s1">'through ssh key, but not both'</span><span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="KubernetesJobWatcher"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher">[docs]</a><span class="k">class</span> <span class="nc">KubernetesJobWatcher</span><span class="p">(</span><span class="n">multiprocessing</span><span class="o">.</span><span class="n">Process</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">,</span> <span class="nb">object</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">namespace</span><span class="p">,</span> <span class="n">watcher_queue</span><span class="p">,</span> <span class="n">resource_version</span><span class="p">,</span> <span class="n">worker_uuid</span><span class="p">,</span> <span class="n">kube_config</span><span class="p">):</span> |
| <span class="n">multiprocessing</span><span class="o">.</span><span class="n">Process</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">namespace</span> <span class="o">=</span> <span class="n">namespace</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span> <span class="o">=</span> <span class="n">worker_uuid</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span> <span class="o">=</span> <span class="n">watcher_queue</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">resource_version</span> <span class="o">=</span> <span class="n">resource_version</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span> <span class="o">=</span> <span class="n">kube_config</span> |
| |
| <div class="viewcode-block" id="KubernetesJobWatcher.run"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher.run">[docs]</a> <span class="k">def</span> <span class="nf">run</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">kube_client</span> <span class="o">=</span> <span class="n">get_kube_client</span><span class="p">()</span> |
| <span class="k">while</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">resource_version</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_run</span><span class="p">(</span><span class="n">kube_client</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">resource_version</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">Exception</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s1">'Unknown error in KubernetesJobWatcher. Failing'</span><span class="p">)</span> |
| <span class="k">raise</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span><span class="s1">'Watch died gracefully, starting back up with: '</span> |
| <span class="s1">'last resource_version: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">resource_version</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="KubernetesJobWatcher._run"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher._run">[docs]</a> <span class="k">def</span> <span class="nf">_run</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">kube_client</span><span class="p">,</span> <span class="n">resource_version</span><span class="p">,</span> <span class="n">worker_uuid</span><span class="p">,</span> <span class="n">kube_config</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'Event: and now my watch begins starting at resource_version: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">resource_version</span> |
| <span class="p">)</span> |
| <span class="n">watcher</span> <span class="o">=</span> <span class="n">watch</span><span class="o">.</span><span class="n">Watch</span><span class="p">()</span> |
| |
| <span class="n">kwargs</span> <span class="o">=</span> <span class="p">{</span><span class="s1">'label_selector'</span><span class="p">:</span> <span class="s1">'airflow-worker=</span><span class="si">{}</span><span class="s1">'</span><span class="o">.</span><span class="n">format</span><span class="p">(</span><span class="n">worker_uuid</span><span class="p">)}</span> |
| <span class="k">if</span> <span class="n">resource_version</span><span class="p">:</span> |
| <span class="n">kwargs</span><span class="p">[</span><span class="s1">'resource_version'</span><span class="p">]</span> <span class="o">=</span> <span class="n">resource_version</span> |
| <span class="k">if</span> <span class="n">kube_config</span><span class="o">.</span><span class="n">kube_client_request_args</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">key</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="n">kube_config</span><span class="o">.</span><span class="n">kube_client_request_args</span><span class="o">.</span><span class="n">iteritems</span><span class="p">():</span> |
| <span class="n">kwargs</span><span class="p">[</span><span class="n">key</span><span class="p">]</span> <span class="o">=</span> <span class="n">value</span> |
| |
| <span class="n">last_resource_version</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">for</span> <span class="n">event</span> <span class="ow">in</span> <span class="n">watcher</span><span class="o">.</span><span class="n">stream</span><span class="p">(</span><span class="n">kube_client</span><span class="o">.</span><span class="n">list_namespaced_pod</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">namespace</span><span class="p">,</span> |
| <span class="o">**</span><span class="n">kwargs</span><span class="p">):</span> |
| <span class="n">task</span> <span class="o">=</span> <span class="n">event</span><span class="p">[</span><span class="s1">'object'</span><span class="p">]</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'Event: </span><span class="si">%s</span><span class="s1"> had an event of type </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">name</span><span class="p">,</span> <span class="n">event</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> |
| <span class="p">)</span> |
| <span class="k">if</span> <span class="n">event</span><span class="p">[</span><span class="s1">'type'</span><span class="p">]</span> <span class="o">==</span> <span class="s1">'ERROR'</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">process_error</span><span class="p">(</span><span class="n">event</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">process_status</span><span class="p">(</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">name</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">status</span><span class="o">.</span><span class="n">phase</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">labels</span><span class="p">,</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">resource_version</span> |
| <span class="p">)</span> |
| <span class="n">last_resource_version</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">metadata</span><span class="o">.</span><span class="n">resource_version</span> |
| |
| <span class="k">return</span> <span class="n">last_resource_version</span></div> |
| |
| <div class="viewcode-block" id="KubernetesJobWatcher.process_error"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher.process_error">[docs]</a> <span class="k">def</span> <span class="nf">process_error</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">event</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span> |
| <span class="s1">'Encountered Error response from k8s list namespaced pod stream => </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">event</span> |
| <span class="p">)</span> |
| <span class="n">raw_object</span> <span class="o">=</span> <span class="n">event</span><span class="p">[</span><span class="s1">'raw_object'</span><span class="p">]</span> |
| <span class="k">if</span> <span class="n">raw_object</span><span class="p">[</span><span class="s1">'code'</span><span class="p">]</span> <span class="o">==</span> <span class="mi">410</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'Kubernetes resource version is too old, must reset to 0 => </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">raw_object</span><span class="p">[</span><span class="s1">'message'</span><span class="p">]</span> |
| <span class="p">)</span> |
| <span class="c1"># Return resource version 0</span> |
| <span class="k">return</span> <span class="s1">'0'</span> |
| <span class="k">raise</span> <span class="n">AirflowException</span><span class="p">(</span> |
| <span class="s1">'Kubernetes failure for </span><span class="si">%s</span><span class="s1"> with code </span><span class="si">%s</span><span class="s1"> and message: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">raw_object</span><span class="p">[</span><span class="s1">'reason'</span><span class="p">],</span> <span class="n">raw_object</span><span class="p">[</span><span class="s1">'code'</span><span class="p">],</span> <span class="n">raw_object</span><span class="p">[</span><span class="s1">'message'</span><span class="p">]</span></div> |
| <span class="p">)</span> |
| |
| <div class="viewcode-block" id="KubernetesJobWatcher.process_status"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesJobWatcher.process_status">[docs]</a> <span class="k">def</span> <span class="nf">process_status</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">,</span> <span class="n">status</span><span class="p">,</span> <span class="n">labels</span><span class="p">,</span> <span class="n">resource_version</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">status</span> <span class="o">==</span> <span class="s1">'Pending'</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Event: </span><span class="si">%s</span><span class="s1"> Pending'</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">)</span> |
| <span class="k">elif</span> <span class="n">status</span> <span class="o">==</span> <span class="s1">'Failed'</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Event: </span><span class="si">%s</span><span class="s1"> Failed'</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span><span class="o">.</span><span class="n">put</span><span class="p">((</span><span class="n">pod_id</span><span class="p">,</span> <span class="n">State</span><span class="o">.</span><span class="n">FAILED</span><span class="p">,</span> <span class="n">labels</span><span class="p">,</span> <span class="n">resource_version</span><span class="p">))</span> |
| <span class="k">elif</span> <span class="n">status</span> <span class="o">==</span> <span class="s1">'Succeeded'</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Event: </span><span class="si">%s</span><span class="s1"> Succeeded'</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span><span class="o">.</span><span class="n">put</span><span class="p">((</span><span class="n">pod_id</span><span class="p">,</span> <span class="kc">None</span><span class="p">,</span> <span class="n">labels</span><span class="p">,</span> <span class="n">resource_version</span><span class="p">))</span> |
| <span class="k">elif</span> <span class="n">status</span> <span class="o">==</span> <span class="s1">'Running'</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Event: </span><span class="si">%s</span><span class="s1"> is Running'</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">)</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span> |
| <span class="s1">'Event: Invalid state: </span><span class="si">%s</span><span class="s1"> on pod: </span><span class="si">%s</span><span class="s1"> with labels: </span><span class="si">%s</span><span class="s1"> with '</span> |
| <span class="s1">'resource_version: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">status</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">,</span> <span class="n">labels</span><span class="p">,</span> <span class="n">resource_version</span></div></div> |
| <span class="p">)</span> |
| |
| |
| <div class="viewcode-block" id="AirflowKubernetesScheduler"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler">[docs]</a><span class="k">class</span> <span class="nc">AirflowKubernetesScheduler</span><span class="p">(</span><span class="n">LoggingMixin</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">kube_config</span><span class="p">,</span> <span class="n">task_queue</span><span class="p">,</span> <span class="n">result_queue</span><span class="p">,</span> <span class="n">kube_client</span><span class="p">,</span> <span class="n">worker_uuid</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Creating Kubernetes executor"</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span> <span class="o">=</span> <span class="n">kube_config</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span> <span class="o">=</span> <span class="n">task_queue</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span> <span class="o">=</span> <span class="n">result_queue</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">namespace</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">kube_namespace</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Kubernetes using namespace </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">namespace</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span> <span class="o">=</span> <span class="n">kube_client</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">launcher</span> <span class="o">=</span> <span class="n">PodLauncher</span><span class="p">(</span><span class="n">kube_client</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_configuration</span> <span class="o">=</span> <span class="n">WorkerConfiguration</span><span class="p">(</span><span class="n">kube_config</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_manager</span> <span class="o">=</span> <span class="n">multiprocessing</span><span class="o">.</span><span class="n">Manager</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_manager</span><span class="o">.</span><span class="n">Queue</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span> <span class="o">=</span> <span class="n">worker_uuid</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_watcher</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_make_kube_watcher</span><span class="p">()</span> |
| |
| <div class="viewcode-block" id="AirflowKubernetesScheduler._make_kube_watcher"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler._make_kube_watcher">[docs]</a> <span class="k">def</span> <span class="nf">_make_kube_watcher</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="n">resource_version</span> <span class="o">=</span> <span class="n">KubeResourceVersion</span><span class="o">.</span><span class="n">get_current_resource_version</span><span class="p">()</span> |
| <span class="n">watcher</span> <span class="o">=</span> <span class="n">KubernetesJobWatcher</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">namespace</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span><span class="p">,</span> |
| <span class="n">resource_version</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="p">)</span> |
| <span class="n">watcher</span><span class="o">.</span><span class="n">start</span><span class="p">()</span> |
| <span class="k">return</span> <span class="n">watcher</span></div> |
| |
| <div class="viewcode-block" id="AirflowKubernetesScheduler._health_check_kube_watcher"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler._health_check_kube_watcher">[docs]</a> <span class="k">def</span> <span class="nf">_health_check_kube_watcher</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_watcher</span><span class="o">.</span><span class="n">is_alive</span><span class="p">():</span> |
| <span class="k">pass</span> |
| <span class="k">else</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">error</span><span class="p">(</span> |
| <span class="s1">'Error while health checking kube watcher process. '</span> |
| <span class="s1">'Process died for unknown reasons'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_watcher</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_make_kube_watcher</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="AirflowKubernetesScheduler.run_next"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.run_next">[docs]</a> <span class="k">def</span> <span class="nf">run_next</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">next_job</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| |
| <span class="sd"> The run_next command will check the task_queue for any un-run jobs.</span> |
| <span class="sd"> It will then create a unique job-id, launch that job in the cluster,</span> |
| <span class="sd"> and store relevant info in the current_jobs map so we can track the job's</span> |
| <span class="sd"> status</span> |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Kubernetes job is </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">next_job</span><span class="p">))</span> |
| <span class="n">key</span><span class="p">,</span> <span class="n">command</span><span class="p">,</span> <span class="n">kube_executor_config</span> <span class="o">=</span> <span class="n">next_job</span> |
| <span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="p">,</span> <span class="n">execution_date</span><span class="p">,</span> <span class="n">try_number</span> <span class="o">=</span> <span class="n">key</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Kubernetes running for command </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">command</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Kubernetes launching image </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">kube_image</span><span class="p">)</span> |
| <span class="n">pod</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">worker_configuration</span><span class="o">.</span><span class="n">make_pod</span><span class="p">(</span> |
| <span class="n">namespace</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">namespace</span><span class="p">,</span> <span class="n">worker_uuid</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span><span class="p">,</span> |
| <span class="n">pod_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_create_pod_id</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="p">),</span> |
| <span class="n">dag_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_make_safe_label_value</span><span class="p">(</span><span class="n">dag_id</span><span class="p">),</span> |
| <span class="n">task_id</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_make_safe_label_value</span><span class="p">(</span><span class="n">task_id</span><span class="p">),</span> |
| <span class="n">try_number</span><span class="o">=</span><span class="n">try_number</span><span class="p">,</span> |
| <span class="n">execution_date</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">_datetime_to_label_safe_datestring</span><span class="p">(</span><span class="n">execution_date</span><span class="p">),</span> |
| <span class="n">airflow_command</span><span class="o">=</span><span class="n">command</span><span class="p">,</span> <span class="n">kube_executor_config</span><span class="o">=</span><span class="n">kube_executor_config</span> |
| <span class="p">)</span> |
| <span class="c1"># the watcher will monitor pods, so we do not block.</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">launcher</span><span class="o">.</span><span class="n">run_pod_async</span><span class="p">(</span><span class="n">pod</span><span class="p">,</span> <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">kube_client_request_args</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s2">"Kubernetes Job created!"</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="AirflowKubernetesScheduler.delete_pod"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod">[docs]</a> <span class="k">def</span> <span class="nf">delete_pod</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span><span class="o">.</span><span class="n">delete_namespaced_pod</span><span class="p">(</span> |
| <span class="n">pod_id</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">namespace</span><span class="p">,</span> <span class="n">body</span><span class="o">=</span><span class="n">client</span><span class="o">.</span><span class="n">V1DeleteOptions</span><span class="p">(),</span> |
| <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">kube_client_request_args</span><span class="p">)</span> |
| <span class="k">except</span> <span class="n">ApiException</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="c1"># If the pod is already deleted</span> |
| <span class="k">if</span> <span class="n">e</span><span class="o">.</span><span class="n">status</span> <span class="o">!=</span> <span class="mi">404</span><span class="p">:</span> |
| <span class="k">raise</span></div> |
| |
| <div class="viewcode-block" id="AirflowKubernetesScheduler.sync"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.sync">[docs]</a> <span class="k">def</span> <span class="nf">sync</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> The sync function checks the status of all currently running kubernetes jobs.</span> |
| <span class="sd"> If a job is completed, it's status is placed in the result queue to</span> |
| <span class="sd"> be sent back to the scheduler.</span> |
| |
| <span class="sd"> :return:</span> |
| |
| <span class="sd"> """</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_health_check_kube_watcher</span><span class="p">()</span> |
| <span class="k">while</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span><span class="o">.</span><span class="n">get_nowait</span><span class="p">()</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">process_watcher_task</span><span class="p">(</span><span class="n">task</span><span class="p">)</span> |
| <span class="k">finally</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span><span class="o">.</span><span class="n">task_done</span><span class="p">()</span> |
| <span class="k">except</span> <span class="n">Empty</span><span class="p">:</span> |
| <span class="k">break</span></div> |
| |
| <div class="viewcode-block" id="AirflowKubernetesScheduler.process_watcher_task"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.process_watcher_task">[docs]</a> <span class="k">def</span> <span class="nf">process_watcher_task</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">task</span><span class="p">):</span> |
| <span class="n">pod_id</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">labels</span><span class="p">,</span> <span class="n">resource_version</span> <span class="o">=</span> <span class="n">task</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'Attempting to finish pod; pod_id: </span><span class="si">%s</span><span class="s1">; state: </span><span class="si">%s</span><span class="s1">; labels: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">pod_id</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">labels</span> |
| <span class="p">)</span> |
| <span class="n">key</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_labels_to_key</span><span class="p">(</span><span class="n">labels</span><span class="o">=</span><span class="n">labels</span><span class="p">)</span> |
| <span class="k">if</span> <span class="n">key</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'finishing job </span><span class="si">%s</span><span class="s1"> - </span><span class="si">%s</span><span class="s1"> (</span><span class="si">%s</span><span class="s1">)'</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span><span class="o">.</span><span class="n">put</span><span class="p">((</span><span class="n">key</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">,</span> <span class="n">resource_version</span><span class="p">))</span></div> |
| |
| <span class="nd">@staticmethod</span> |
| <div class="viewcode-block" id="AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler._strip_unsafe_kubernetes_special_chars">[docs]</a> <span class="k">def</span> <span class="nf">_strip_unsafe_kubernetes_special_chars</span><span class="p">(</span><span class="n">string</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Kubernetes only supports lowercase alphanumeric characters and "-" and "." in</span> |
| <span class="sd"> the pod name</span> |
| <span class="sd"> However, there are special rules about how "-" and "." can be used so let's</span> |
| <span class="sd"> only keep</span> |
| <span class="sd"> alphanumeric chars see here for detail:</span> |
| <span class="sd"> https://kubernetes.io/docs/concepts/overview/working-with-objects/names/</span> |
| |
| <span class="sd"> :param string: The requested Pod name</span> |
| <span class="sd"> :return: ``str`` Pod name stripped of any unsafe characters</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="s1">''</span><span class="o">.</span><span class="n">join</span><span class="p">(</span><span class="n">ch</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span> <span class="k">for</span> <span class="n">ind</span><span class="p">,</span> <span class="n">ch</span> <span class="ow">in</span> <span class="nb">enumerate</span><span class="p">(</span><span class="n">string</span><span class="p">)</span> <span class="k">if</span> <span class="n">ch</span><span class="o">.</span><span class="n">isalnum</span><span class="p">())</span></div> |
| |
| <span class="nd">@staticmethod</span> |
| <div class="viewcode-block" id="AirflowKubernetesScheduler._make_safe_pod_id"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler._make_safe_pod_id">[docs]</a> <span class="k">def</span> <span class="nf">_make_safe_pod_id</span><span class="p">(</span><span class="n">safe_dag_id</span><span class="p">,</span> <span class="n">safe_task_id</span><span class="p">,</span> <span class="n">safe_uuid</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Kubernetes pod names must be <= 253 chars and must pass the following regex for</span> |
| <span class="sd"> validation</span> |
| <span class="sd"> "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"</span> |
| |
| <span class="sd"> :param safe_dag_id: a dag_id with only alphanumeric characters</span> |
| <span class="sd"> :param safe_task_id: a task_id with only alphanumeric characters</span> |
| <span class="sd"> :param random_uuid: a uuid</span> |
| <span class="sd"> :return: ``str`` valid Pod name of appropriate length</span> |
| <span class="sd"> """</span> |
| <span class="n">MAX_POD_ID_LEN</span> <span class="o">=</span> <span class="mi">253</span> |
| |
| <span class="n">safe_key</span> <span class="o">=</span> <span class="n">safe_dag_id</span> <span class="o">+</span> <span class="n">safe_task_id</span> |
| |
| <span class="n">safe_pod_id</span> <span class="o">=</span> <span class="n">safe_key</span><span class="p">[:</span><span class="n">MAX_POD_ID_LEN</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="n">safe_uuid</span><span class="p">)</span> <span class="o">-</span> <span class="mi">1</span><span class="p">]</span> <span class="o">+</span> <span class="s2">"-"</span> <span class="o">+</span> <span class="n">safe_uuid</span> |
| |
| <span class="k">return</span> <span class="n">safe_pod_id</span></div> |
| |
| <span class="nd">@staticmethod</span> |
| <div class="viewcode-block" id="AirflowKubernetesScheduler._make_safe_label_value"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler._make_safe_label_value">[docs]</a> <span class="k">def</span> <span class="nf">_make_safe_label_value</span><span class="p">(</span><span class="n">string</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Valid label values must be 63 characters or less and must be empty or begin and</span> |
| <span class="sd"> end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),</span> |
| <span class="sd"> dots (.), and alphanumerics between.</span> |
| |
| <span class="sd"> If the label value is then greater than 63 chars once made safe, or differs in any</span> |
| <span class="sd"> way from the original value sent to this function, then we need to truncate to</span> |
| <span class="sd"> 53chars, and append it with a unique hash.</span> |
| <span class="sd"> """</span> |
| <span class="n">MAX_LABEL_LEN</span> <span class="o">=</span> <span class="mi">63</span> |
| |
| <span class="n">safe_label</span> <span class="o">=</span> <span class="n">re</span><span class="o">.</span><span class="n">sub</span><span class="p">(</span><span class="sa">r</span><span class="s1">'^[^a-z0-9A-Z]*|[^a-zA-Z0-9_\-\.]|[^a-z0-9A-Z]*$'</span><span class="p">,</span> <span class="s1">''</span><span class="p">,</span> <span class="n">string</span><span class="p">)</span> |
| |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">safe_label</span><span class="p">)</span> <span class="o">></span> <span class="n">MAX_LABEL_LEN</span> <span class="ow">or</span> <span class="n">string</span> <span class="o">!=</span> <span class="n">safe_label</span><span class="p">:</span> |
| <span class="n">safe_hash</span> <span class="o">=</span> <span class="n">hashlib</span><span class="o">.</span><span class="n">md5</span><span class="p">(</span><span class="n">string</span><span class="o">.</span><span class="n">encode</span><span class="p">())</span><span class="o">.</span><span class="n">hexdigest</span><span class="p">()[:</span><span class="mi">9</span><span class="p">]</span> |
| <span class="n">safe_label</span> <span class="o">=</span> <span class="n">safe_label</span><span class="p">[:</span><span class="n">MAX_LABEL_LEN</span> <span class="o">-</span> <span class="nb">len</span><span class="p">(</span><span class="n">safe_hash</span><span class="p">)</span> <span class="o">-</span> <span class="mi">1</span><span class="p">]</span> <span class="o">+</span> <span class="s2">"-"</span> <span class="o">+</span> <span class="n">safe_hash</span> |
| |
| <span class="k">return</span> <span class="n">safe_label</span></div> |
| |
| <span class="nd">@staticmethod</span> |
| <div class="viewcode-block" id="AirflowKubernetesScheduler._create_pod_id"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler._create_pod_id">[docs]</a> <span class="k">def</span> <span class="nf">_create_pod_id</span><span class="p">(</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="p">):</span> |
| <span class="n">safe_dag_id</span> <span class="o">=</span> <span class="n">AirflowKubernetesScheduler</span><span class="o">.</span><span class="n">_strip_unsafe_kubernetes_special_chars</span><span class="p">(</span> |
| <span class="n">dag_id</span><span class="p">)</span> |
| <span class="n">safe_task_id</span> <span class="o">=</span> <span class="n">AirflowKubernetesScheduler</span><span class="o">.</span><span class="n">_strip_unsafe_kubernetes_special_chars</span><span class="p">(</span> |
| <span class="n">task_id</span><span class="p">)</span> |
| <span class="n">safe_uuid</span> <span class="o">=</span> <span class="n">AirflowKubernetesScheduler</span><span class="o">.</span><span class="n">_strip_unsafe_kubernetes_special_chars</span><span class="p">(</span> |
| <span class="n">uuid4</span><span class="p">()</span><span class="o">.</span><span class="n">hex</span><span class="p">)</span> |
| <span class="k">return</span> <span class="n">AirflowKubernetesScheduler</span><span class="o">.</span><span class="n">_make_safe_pod_id</span><span class="p">(</span><span class="n">safe_dag_id</span><span class="p">,</span> <span class="n">safe_task_id</span><span class="p">,</span> |
| <span class="n">safe_uuid</span><span class="p">)</span></div> |
| |
| <span class="nd">@staticmethod</span> |
| <div class="viewcode-block" id="AirflowKubernetesScheduler._label_safe_datestring_to_datetime"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler._label_safe_datestring_to_datetime">[docs]</a> <span class="k">def</span> <span class="nf">_label_safe_datestring_to_datetime</span><span class="p">(</span><span class="n">string</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not</span> |
| <span class="sd"> "_", let's</span> |
| <span class="sd"> replace ":" with "_"</span> |
| |
| <span class="sd"> :param string: str</span> |
| <span class="sd"> :return: datetime.datetime object</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="n">parser</span><span class="o">.</span><span class="n">parse</span><span class="p">(</span><span class="n">string</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'_plus_'</span><span class="p">,</span> <span class="s1">'+'</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s2">"_"</span><span class="p">,</span> <span class="s2">":"</span><span class="p">))</span></div> |
| |
| <span class="nd">@staticmethod</span> |
| <div class="viewcode-block" id="AirflowKubernetesScheduler._datetime_to_label_safe_datestring"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler._datetime_to_label_safe_datestring">[docs]</a> <span class="k">def</span> <span class="nf">_datetime_to_label_safe_datestring</span><span class="p">(</span><span class="n">datetime_obj</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but</span> |
| <span class="sd"> not "_" let's</span> |
| <span class="sd"> replace ":" with "_"</span> |
| <span class="sd"> :param datetime_obj: datetime.datetime object</span> |
| <span class="sd"> :return: ISO-like string representing the datetime</span> |
| <span class="sd"> """</span> |
| <span class="k">return</span> <span class="n">datetime_obj</span><span class="o">.</span><span class="n">isoformat</span><span class="p">()</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s2">":"</span><span class="p">,</span> <span class="s2">"_"</span><span class="p">)</span><span class="o">.</span><span class="n">replace</span><span class="p">(</span><span class="s1">'+'</span><span class="p">,</span> <span class="s1">'_plus_'</span><span class="p">)</span></div> |
| |
| <div class="viewcode-block" id="AirflowKubernetesScheduler._labels_to_key"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler._labels_to_key">[docs]</a> <span class="k">def</span> <span class="nf">_labels_to_key</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">labels</span><span class="p">):</span> |
| <span class="n">try_num</span> <span class="o">=</span> <span class="mi">1</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">try_num</span> <span class="o">=</span> <span class="nb">int</span><span class="p">(</span><span class="n">labels</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'try_number'</span><span class="p">,</span> <span class="s1">'1'</span><span class="p">))</span> |
| <span class="k">except</span> <span class="ne">ValueError</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span><span class="s2">"could not get try_number as an int: </span><span class="si">%s</span><span class="s2">"</span><span class="p">,</span> <span class="n">labels</span><span class="o">.</span><span class="n">get</span><span class="p">(</span><span class="s1">'try_number'</span><span class="p">,</span> <span class="s1">'1'</span><span class="p">))</span> |
| |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">dag_id</span> <span class="o">=</span> <span class="n">labels</span><span class="p">[</span><span class="s1">'dag_id'</span><span class="p">]</span> |
| <span class="n">task_id</span> <span class="o">=</span> <span class="n">labels</span><span class="p">[</span><span class="s1">'task_id'</span><span class="p">]</span> |
| <span class="n">ex_time</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_label_safe_datestring_to_datetime</span><span class="p">(</span><span class="n">labels</span><span class="p">[</span><span class="s1">'execution_date'</span><span class="p">])</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span> |
| <span class="s1">'Error while retrieving labels; labels: </span><span class="si">%s</span><span class="s1">; exception: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">labels</span><span class="p">,</span> <span class="n">e</span> |
| <span class="p">)</span> |
| <span class="k">return</span> <span class="kc">None</span> |
| |
| <span class="k">with</span> <span class="n">create_session</span><span class="p">()</span> <span class="k">as</span> <span class="n">session</span><span class="p">:</span> |
| <span class="n">tasks</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="n">session</span> |
| <span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TaskInstance</span><span class="p">)</span> |
| <span class="o">.</span><span class="n">filter_by</span><span class="p">(</span><span class="n">execution_date</span><span class="o">=</span><span class="n">ex_time</span><span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">()</span> |
| <span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'Checking </span><span class="si">%s</span><span class="s1"> task instances.'</span><span class="p">,</span> |
| <span class="nb">len</span><span class="p">(</span><span class="n">tasks</span><span class="p">)</span> |
| <span class="p">)</span> |
| <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">tasks</span><span class="p">:</span> |
| <span class="k">if</span> <span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_make_safe_label_value</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">dag_id</span><span class="p">)</span> <span class="o">==</span> <span class="n">dag_id</span> <span class="ow">and</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_make_safe_label_value</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">)</span> <span class="o">==</span> <span class="n">task_id</span> <span class="ow">and</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="n">ex_time</span> |
| <span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'Found matching task </span><span class="si">%s</span><span class="s1">-</span><span class="si">%s</span><span class="s1"> (</span><span class="si">%s</span><span class="s1">) with current state of </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">execution_date</span><span class="p">,</span> <span class="n">task</span><span class="o">.</span><span class="n">state</span> |
| <span class="p">)</span> |
| <span class="n">dag_id</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">dag_id</span> |
| <span class="n">task_id</span> <span class="o">=</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span> |
| <span class="k">return</span> <span class="p">(</span><span class="n">dag_id</span><span class="p">,</span> <span class="n">task_id</span><span class="p">,</span> <span class="n">ex_time</span><span class="p">,</span> <span class="n">try_num</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warn</span><span class="p">(</span> |
| <span class="s1">'Failed to find and match task details to a pod; labels: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">labels</span> |
| <span class="p">)</span> |
| <span class="k">return</span> <span class="kc">None</span></div> |
| |
| <div class="viewcode-block" id="AirflowKubernetesScheduler.terminate"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.AirflowKubernetesScheduler.terminate">[docs]</a> <span class="k">def</span> <span class="nf">terminate</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">watcher_queue</span><span class="o">.</span><span class="n">join</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_manager</span><span class="o">.</span><span class="n">shutdown</span><span class="p">()</span></div></div> |
| |
| |
| <div class="viewcode-block" id="KubernetesExecutor"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor">[docs]</a><span class="k">class</span> <span class="nc">KubernetesExecutor</span><span class="p">(</span><span class="n">BaseExecutor</span><span class="p">,</span> <span class="n">LoggingMixin</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">__init__</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span> <span class="o">=</span> <span class="n">KubeConfig</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_scheduler</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_manager</span> <span class="o">=</span> <span class="n">multiprocessing</span><span class="o">.</span><span class="n">Manager</span><span class="p">()</span> |
| <span class="nb">super</span><span class="p">(</span><span class="n">KubernetesExecutor</span><span class="p">,</span> <span class="bp">self</span><span class="p">)</span><span class="o">.</span><span class="fm">__init__</span><span class="p">(</span><span class="n">parallelism</span><span class="o">=</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">parallelism</span><span class="p">)</span> |
| |
| <span class="nd">@provide_session</span> |
| <div class="viewcode-block" id="KubernetesExecutor.clear_not_launched_queued_tasks"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor.clear_not_launched_queued_tasks">[docs]</a> <span class="k">def</span> <span class="nf">clear_not_launched_queued_tasks</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">session</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="sd">"""</span> |
| <span class="sd"> If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or</span> |
| <span class="sd"> may not</span> |
| <span class="sd"> have been launched Thus, on starting up the scheduler let's check every</span> |
| <span class="sd"> "Queued" task to</span> |
| <span class="sd"> see if it has been launched (ie: if there is a corresponding pod on kubernetes)</span> |
| |
| <span class="sd"> If it has been launched then do nothing, otherwise reset the state to "None" so</span> |
| <span class="sd"> the task</span> |
| <span class="sd"> will be rescheduled</span> |
| |
| <span class="sd"> This will not be necessary in a future version of airflow in which there is</span> |
| <span class="sd"> proper support</span> |
| <span class="sd"> for State.LAUNCHED</span> |
| <span class="sd"> """</span> |
| <span class="n">queued_tasks</span> <span class="o">=</span> <span class="n">session</span>\ |
| <span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TaskInstance</span><span class="p">)</span>\ |
| <span class="o">.</span><span class="n">filter</span><span class="p">(</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">state</span> <span class="o">==</span> <span class="n">State</span><span class="o">.</span><span class="n">QUEUED</span><span class="p">)</span><span class="o">.</span><span class="n">all</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'When executor started up, found </span><span class="si">%s</span><span class="s1"> queued task instances'</span><span class="p">,</span> |
| <span class="nb">len</span><span class="p">(</span><span class="n">queued_tasks</span><span class="p">)</span> |
| <span class="p">)</span> |
| |
| <span class="k">for</span> <span class="n">task</span> <span class="ow">in</span> <span class="n">queued_tasks</span><span class="p">:</span> |
| <span class="n">dict_string</span> <span class="o">=</span> <span class="p">(</span> |
| <span class="s2">"dag_id=</span><span class="si">{}</span><span class="s2">,task_id=</span><span class="si">{}</span><span class="s2">,execution_date=</span><span class="si">{}</span><span class="s2">,airflow-worker=</span><span class="si">{}</span><span class="s2">"</span><span class="o">.</span><span class="n">format</span><span class="p">(</span> |
| <span class="n">AirflowKubernetesScheduler</span><span class="o">.</span><span class="n">_make_safe_label_value</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">dag_id</span><span class="p">),</span> |
| <span class="n">AirflowKubernetesScheduler</span><span class="o">.</span><span class="n">_make_safe_label_value</span><span class="p">(</span><span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">),</span> |
| <span class="n">AirflowKubernetesScheduler</span><span class="o">.</span><span class="n">_datetime_to_label_safe_datestring</span><span class="p">(</span> |
| <span class="n">task</span><span class="o">.</span><span class="n">execution_date</span> |
| <span class="p">),</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span> |
| <span class="p">)</span> |
| <span class="p">)</span> |
| <span class="n">kwargs</span> <span class="o">=</span> <span class="nb">dict</span><span class="p">(</span><span class="n">label_selector</span><span class="o">=</span><span class="n">dict_string</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">kube_client_request_args</span><span class="p">:</span> |
| <span class="k">for</span> <span class="n">key</span><span class="p">,</span> <span class="n">value</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">kube_client_request_args</span><span class="o">.</span><span class="n">iteritems</span><span class="p">():</span> |
| <span class="n">kwargs</span><span class="p">[</span><span class="n">key</span><span class="p">]</span> <span class="o">=</span> <span class="n">value</span> |
| <span class="n">pod_list</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span><span class="o">.</span><span class="n">list_namespaced_pod</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">kube_namespace</span><span class="p">,</span> <span class="o">**</span><span class="n">kwargs</span><span class="p">)</span> |
| <span class="k">if</span> <span class="nb">len</span><span class="p">(</span><span class="n">pod_list</span><span class="o">.</span><span class="n">items</span><span class="p">)</span> <span class="o">==</span> <span class="mi">0</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'TaskInstance: </span><span class="si">%s</span><span class="s1"> found in queued state but was not launched, '</span> |
| <span class="s1">'rescheduling'</span><span class="p">,</span> <span class="n">task</span> |
| <span class="p">)</span> |
| <span class="n">session</span><span class="o">.</span><span class="n">query</span><span class="p">(</span><span class="n">TaskInstance</span><span class="p">)</span><span class="o">.</span><span class="n">filter</span><span class="p">(</span> |
| <span class="n">TaskInstance</span><span class="o">.</span><span class="n">dag_id</span> <span class="o">==</span> <span class="n">task</span><span class="o">.</span><span class="n">dag_id</span><span class="p">,</span> |
| <span class="n">TaskInstance</span><span class="o">.</span><span class="n">task_id</span> <span class="o">==</span> <span class="n">task</span><span class="o">.</span><span class="n">task_id</span><span class="p">,</span> |
| <span class="n">TaskInstance</span><span class="o">.</span><span class="n">execution_date</span> <span class="o">==</span> <span class="n">task</span><span class="o">.</span><span class="n">execution_date</span> |
| <span class="p">)</span><span class="o">.</span><span class="n">update</span><span class="p">({</span><span class="n">TaskInstance</span><span class="o">.</span><span class="n">state</span><span class="p">:</span> <span class="n">State</span><span class="o">.</span><span class="n">NONE</span><span class="p">})</span></div> |
| |
| <div class="viewcode-block" id="KubernetesExecutor._inject_secrets"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor._inject_secrets">[docs]</a> <span class="k">def</span> <span class="nf">_inject_secrets</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">def</span> <span class="nf">_create_or_update_secret</span><span class="p">(</span><span class="n">secret_name</span><span class="p">,</span> <span class="n">secret_path</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span><span class="o">.</span><span class="n">create_namespaced_secret</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">executor_namespace</span><span class="p">,</span> <span class="n">kubernetes</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">V1Secret</span><span class="p">(</span> |
| <span class="n">data</span><span class="o">=</span><span class="p">{</span> |
| <span class="s1">'key.json'</span><span class="p">:</span> <span class="n">base64</span><span class="o">.</span><span class="n">b64encode</span><span class="p">(</span><span class="nb">open</span><span class="p">(</span><span class="n">secret_path</span><span class="p">,</span> <span class="s1">'r'</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">())},</span> |
| <span class="n">metadata</span><span class="o">=</span><span class="n">kubernetes</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">V1ObjectMeta</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="n">secret_name</span><span class="p">)),</span> |
| <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">kube_client_request_args</span><span class="p">)</span> |
| <span class="k">except</span> <span class="n">ApiException</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="k">if</span> <span class="n">e</span><span class="o">.</span><span class="n">status</span> <span class="o">==</span> <span class="mi">409</span><span class="p">:</span> |
| <span class="k">return</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span><span class="o">.</span><span class="n">replace_namespaced_secret</span><span class="p">(</span> |
| <span class="n">secret_name</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">executor_namespace</span><span class="p">,</span> |
| <span class="n">kubernetes</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">V1Secret</span><span class="p">(</span> |
| <span class="n">data</span><span class="o">=</span><span class="p">{</span><span class="s1">'key.json'</span><span class="p">:</span> <span class="n">base64</span><span class="o">.</span><span class="n">b64encode</span><span class="p">(</span> |
| <span class="nb">open</span><span class="p">(</span><span class="n">secret_path</span><span class="p">,</span> <span class="s1">'r'</span><span class="p">)</span><span class="o">.</span><span class="n">read</span><span class="p">())},</span> |
| <span class="n">metadata</span><span class="o">=</span><span class="n">kubernetes</span><span class="o">.</span><span class="n">client</span><span class="o">.</span><span class="n">V1ObjectMeta</span><span class="p">(</span><span class="n">name</span><span class="o">=</span><span class="n">secret_name</span><span class="p">)),</span> |
| <span class="o">**</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">kube_client_request_args</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span> |
| <span class="s1">'Exception while trying to inject secret. '</span> |
| <span class="s1">'Secret name: </span><span class="si">%s</span><span class="s1">, error details: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">secret_name</span><span class="p">,</span> <span class="n">e</span> |
| <span class="p">)</span> |
| <span class="k">raise</span> |
| |
| <span class="c1"># For each GCP service account key, inject it as a secret in executor</span> |
| <span class="c1"># namespace with the specific secret name configured in the airflow.cfg.</span> |
| <span class="c1"># We let exceptions to pass through to users.</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">gcp_service_account_keys</span><span class="p">:</span> |
| <span class="n">name_path_pair_list</span> <span class="o">=</span> <span class="p">[</span> |
| <span class="p">{</span><span class="s1">'name'</span><span class="p">:</span> <span class="n">account_spec</span><span class="o">.</span><span class="n">strip</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'='</span><span class="p">)[</span><span class="mi">0</span><span class="p">],</span> |
| <span class="s1">'path'</span><span class="p">:</span> <span class="n">account_spec</span><span class="o">.</span><span class="n">strip</span><span class="p">()</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">'='</span><span class="p">)[</span><span class="mi">1</span><span class="p">]}</span> |
| <span class="k">for</span> <span class="n">account_spec</span> <span class="ow">in</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">gcp_service_account_keys</span><span class="o">.</span><span class="n">split</span><span class="p">(</span><span class="s1">','</span><span class="p">)]</span> |
| <span class="k">for</span> <span class="n">service_account</span> <span class="ow">in</span> <span class="n">name_path_pair_list</span><span class="p">:</span> |
| <span class="n">_create_or_update_secret</span><span class="p">(</span><span class="n">service_account</span><span class="p">[</span><span class="s1">'name'</span><span class="p">],</span> <span class="n">service_account</span><span class="p">[</span><span class="s1">'path'</span><span class="p">])</span></div> |
| |
| <div class="viewcode-block" id="KubernetesExecutor.start"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor.start">[docs]</a> <span class="k">def</span> <span class="nf">start</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Start Kubernetes executor'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span> <span class="o">=</span> <span class="n">KubeWorkerIdentifier</span><span class="o">.</span><span class="n">get_or_create_current_kube_worker_uuid</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'Start with worker_uuid: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span><span class="p">)</span> |
| <span class="c1"># always need to reset resource version since we don't know</span> |
| <span class="c1"># when we last started, note for behavior below</span> |
| <span class="c1"># https://github.com/kubernetes-client/python/blob/master/kubernetes/docs</span> |
| <span class="c1"># /CoreV1Api.md#list_namespaced_pod</span> |
| <span class="n">KubeResourceVersion</span><span class="o">.</span><span class="n">reset_resource_version</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_manager</span><span class="o">.</span><span class="n">Queue</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">_manager</span><span class="o">.</span><span class="n">Queue</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span> <span class="o">=</span> <span class="n">get_kube_client</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_scheduler</span> <span class="o">=</span> <span class="n">AirflowKubernetesScheduler</span><span class="p">(</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span><span class="p">,</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_client</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">worker_uuid</span> |
| <span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_inject_secrets</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">clear_not_launched_queued_tasks</span><span class="p">()</span></div> |
| |
| <div class="viewcode-block" id="KubernetesExecutor.execute_async"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor.execute_async">[docs]</a> <span class="k">def</span> <span class="nf">execute_async</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">command</span><span class="p">,</span> <span class="n">queue</span><span class="o">=</span><span class="kc">None</span><span class="p">,</span> <span class="n">executor_config</span><span class="o">=</span><span class="kc">None</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span> |
| <span class="s1">'Add task </span><span class="si">%s</span><span class="s1"> with command </span><span class="si">%s</span><span class="s1"> with executor_config </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> |
| <span class="n">key</span><span class="p">,</span> <span class="n">command</span><span class="p">,</span> <span class="n">executor_config</span> |
| <span class="p">)</span> |
| <span class="n">kube_executor_config</span> <span class="o">=</span> <span class="n">KubernetesExecutorConfig</span><span class="o">.</span><span class="n">from_dict</span><span class="p">(</span><span class="n">executor_config</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span><span class="o">.</span><span class="n">put</span><span class="p">((</span><span class="n">key</span><span class="p">,</span> <span class="n">command</span><span class="p">,</span> <span class="n">kube_executor_config</span><span class="p">))</span></div> |
| |
| <div class="viewcode-block" id="KubernetesExecutor.sync"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor.sync">[docs]</a> <span class="k">def</span> <span class="nf">sync</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">running</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'self.running: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">running</span><span class="p">)</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">queued_tasks</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'self.queued: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="bp">self</span><span class="o">.</span><span class="n">queued_tasks</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_scheduler</span><span class="o">.</span><span class="n">sync</span><span class="p">()</span> |
| |
| <span class="n">last_resource_version</span> <span class="o">=</span> <span class="kc">None</span> |
| <span class="k">while</span> <span class="kc">True</span><span class="p">:</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">results</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span><span class="o">.</span><span class="n">get_nowait</span><span class="p">()</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">key</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">,</span> <span class="n">resource_version</span> <span class="o">=</span> <span class="n">results</span> |
| <span class="n">last_resource_version</span> <span class="o">=</span> <span class="n">resource_version</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Changing state of </span><span class="si">%s</span><span class="s1"> to </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="n">results</span><span class="p">,</span> <span class="n">state</span><span class="p">)</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_change_state</span><span class="p">(</span><span class="n">key</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">Exception</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">exception</span><span class="p">(</span><span class="s1">'Exception: </span><span class="si">%s</span><span class="s1"> when attempting '</span> <span class="o">+</span> |
| <span class="s1">'to change state of </span><span class="si">%s</span><span class="s1"> to </span><span class="si">%s</span><span class="s1">, re-queueing.'</span><span class="p">,</span> <span class="n">e</span><span class="p">,</span> <span class="n">results</span><span class="p">,</span> <span class="n">state</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span><span class="o">.</span><span class="n">put</span><span class="p">(</span><span class="n">results</span><span class="p">)</span> |
| <span class="k">finally</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span><span class="o">.</span><span class="n">task_done</span><span class="p">()</span> |
| <span class="k">except</span> <span class="n">Empty</span><span class="p">:</span> |
| <span class="k">break</span> |
| |
| <span class="n">KubeResourceVersion</span><span class="o">.</span><span class="n">checkpoint_resource_version</span><span class="p">(</span><span class="n">last_resource_version</span><span class="p">)</span> |
| |
| <span class="k">for</span> <span class="n">_</span> <span class="ow">in</span> <span class="nb">range</span><span class="p">(</span><span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">worker_pods_creation_batch_size</span><span class="p">):</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="n">task</span> <span class="o">=</span> <span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span><span class="o">.</span><span class="n">get_nowait</span><span class="p">()</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_scheduler</span><span class="o">.</span><span class="n">run_next</span><span class="p">(</span><span class="n">task</span><span class="p">)</span> |
| <span class="k">except</span> <span class="n">ApiException</span> <span class="k">as</span> <span class="n">e</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">warning</span><span class="p">(</span><span class="s1">'ApiException when attempting to run task, re-queueing. '</span> |
| <span class="s1">'Message: </span><span class="si">%s</span><span class="s1">'</span> <span class="o">%</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">e</span><span class="o">.</span><span class="n">body</span><span class="p">)[</span><span class="s1">'message'</span><span class="p">])</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span><span class="o">.</span><span class="n">put</span><span class="p">(</span><span class="n">task</span><span class="p">)</span> |
| <span class="k">finally</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span><span class="o">.</span><span class="n">task_done</span><span class="p">()</span> |
| <span class="k">except</span> <span class="n">Empty</span><span class="p">:</span> |
| <span class="k">break</span></div> |
| |
| <div class="viewcode-block" id="KubernetesExecutor._change_state"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor._change_state">[docs]</a> <span class="k">def</span> <span class="nf">_change_state</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">key</span><span class="p">,</span> <span class="n">state</span><span class="p">,</span> <span class="n">pod_id</span><span class="p">):</span> |
| <span class="k">if</span> <span class="n">state</span> <span class="o">!=</span> <span class="n">State</span><span class="o">.</span><span class="n">RUNNING</span><span class="p">:</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_config</span><span class="o">.</span><span class="n">delete_worker_pods</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_scheduler</span><span class="o">.</span><span class="n">delete_pod</span><span class="p">(</span><span class="n">pod_id</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Deleted pod: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">key</span><span class="p">))</span> |
| <span class="k">try</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">running</span><span class="o">.</span><span class="n">pop</span><span class="p">(</span><span class="n">key</span><span class="p">)</span> |
| <span class="k">except</span> <span class="ne">KeyError</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">debug</span><span class="p">(</span><span class="s1">'Could not find key: </span><span class="si">%s</span><span class="s1">'</span><span class="p">,</span> <span class="nb">str</span><span class="p">(</span><span class="n">key</span><span class="p">))</span> |
| <span class="k">pass</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">event_buffer</span><span class="p">[</span><span class="n">key</span><span class="p">]</span> <span class="o">=</span> <span class="n">state</span></div> |
| |
| <div class="viewcode-block" id="KubernetesExecutor.end"><a class="viewcode-back" href="../../../../_api/airflow/contrib/executors/kubernetes_executor/index.html#airflow.contrib.executors.kubernetes_executor.KubernetesExecutor.end">[docs]</a> <span class="k">def</span> <span class="nf">end</span><span class="p">(</span><span class="bp">self</span><span class="p">):</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">log</span><span class="o">.</span><span class="n">info</span><span class="p">(</span><span class="s1">'Shutting down Kubernetes executor'</span><span class="p">)</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">task_queue</span><span class="o">.</span><span class="n">join</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">result_queue</span><span class="o">.</span><span class="n">join</span><span class="p">()</span> |
| <span class="k">if</span> <span class="bp">self</span><span class="o">.</span><span class="n">kube_scheduler</span><span class="p">:</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">kube_scheduler</span><span class="o">.</span><span class="n">terminate</span><span class="p">()</span> |
| <span class="bp">self</span><span class="o">.</span><span class="n">_manager</span><span class="o">.</span><span class="n">shutdown</span><span class="p">()</span></div></div> |
| </pre></div> |
| |
| </div> |
| |
| </div> |
| <footer> |
| |
| |
| <hr/> |
| |
| <div role="contentinfo"> |
| <p> |
| |
| </p> |
| </div> |
| Built with <a href="http://sphinx-doc.org/">Sphinx</a> using a <a href="https://github.com/rtfd/sphinx_rtd_theme">theme</a> provided by <a href="https://readthedocs.org">Read the Docs</a>. |
| |
| </footer> |
| |
| </div> |
| </div> |
| |
| </section> |
| |
| </div> |
| |
| |
| |
| <script type="text/javascript"> |
| jQuery(function () { |
| SphinxRtdTheme.Navigation.enable(true); |
| }); |
| </script> |
| |
| |
| |
| |
| |
| |
| </body> |
| </html> |