| :mod:`airflow.executors.kubernetes_executor` |
| ============================================ |
| |
| .. py:module:: airflow.executors.kubernetes_executor |
| |
| .. autoapi-nested-parse:: |
| |
| KubernetesExecutor |
| |
| .. seealso:: |
| For more information on how the KubernetesExecutor works, take a look at the guide: |
| :ref:`executor:KubernetesExecutor` |
| |
| |
| |
| Module Contents |
| --------------- |
| |
| .. py:class:: KubeConfig |
| |
| Configuration for Kubernetes |
| |
| .. attribute:: core_section |
| :annotation: = core |
| |
| |
| |
| .. attribute:: kubernetes_section |
| :annotation: = kubernetes |
| |
| |
| |
| |
| .. method:: _get_security_context_val(self, scontext) |
| |
| |
| |
| |
| .. method:: _validate(self) |
| |
| |
| |
| |
| .. py:class:: KubernetesJobWatcher(namespace, multi_namespace_mode, watcher_queue, resource_version, worker_uuid, kube_config) |
| |
| Bases: :class:`multiprocessing.Process`, :class:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| Watches for Kubernetes jobs |
| |
| |
| .. method:: run(self) |
| |
| Performs watching |
| |
| |
| |
| |
| .. method:: _run(self, kube_client, resource_version, worker_uuid, kube_config) |
| |
| |
| |
| |
| .. method:: process_error(self, event) |
| |
| Process error response |
| |
| |
| |
| |
| .. method:: process_status(self, pod_id, namespace, status, labels, resource_version, event) |
| |
| Process status response |
| |
| |
| |
| |
| .. py:class:: AirflowKubernetesScheduler(kube_config, task_queue, result_queue, kube_client, worker_uuid) |
| |
| Bases: :class:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| Airflow Scheduler for Kubernetes |
| |
| |
| .. method:: _make_kube_watcher(self) |
| |
| |
| |
| |
| .. method:: _health_check_kube_watcher(self) |
| |
| |
| |
| |
| .. method:: run_next(self, next_job) |
| |
| The run_next command will check the task_queue for any un-run jobs. |
| It will then create a unique job-id, launch that job in the cluster, |
| and store relevant info in the current_jobs map so we can track the job's |
| status |
| |
| |
| |
| |
| .. method:: delete_pod(self, pod_id, namespace) |
| |
| Deletes POD |
| |
| |
| |
| |
| .. method:: sync(self) |
| |
| The sync function checks the status of all currently running kubernetes jobs. |
| If a job is completed, it's status is placed in the result queue to |
| be sent back to the scheduler. |
| |
| :return: |
| |
| |
| |
| |
| .. method:: process_watcher_task(self, task) |
| |
| Process the task by watcher. |
| |
| |
| |
| |
| .. staticmethod:: _strip_unsafe_kubernetes_special_chars(string) |
| |
| Kubernetes only supports lowercase alphanumeric characters and "-" and "." in |
| the pod name |
| However, there are special rules about how "-" and "." can be used so let's |
| only keep |
| alphanumeric chars see here for detail: |
| https://kubernetes.io/docs/concepts/overview/working-with-objects/names/ |
| |
| :param string: The requested Pod name |
| :return: ``str`` Pod name stripped of any unsafe characters |
| |
| |
| |
| |
| .. staticmethod:: _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid) |
| |
| Kubernetes pod names must be <= 253 chars and must pass the following regex for |
| validation |
| ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$`` |
| |
| :param safe_dag_id: a dag_id with only alphanumeric characters |
| :param safe_task_id: a task_id with only alphanumeric characters |
| :param safe_uuid: a uuid |
| :return: ``str`` valid Pod name of appropriate length |
| |
| |
| |
| |
| .. staticmethod:: _create_pod_id(dag_id, task_id) |
| |
| |
| |
| |
| .. staticmethod:: _label_safe_datestring_to_datetime(string) |
| |
| Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not |
| "_", let's |
| replace ":" with "_" |
| |
| :param string: str |
| :return: datetime.datetime object |
| |
| |
| |
| |
| .. staticmethod:: _datetime_to_label_safe_datestring(datetime_obj) |
| |
| Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but |
| not "_" let's |
| replace ":" with "_" |
| |
| :param datetime_obj: datetime.datetime object |
| :return: ISO-like string representing the datetime |
| |
| |
| |
| |
| .. method:: _labels_to_key(self, labels) |
| |
| |
| |
| |
| .. method:: _flush_watcher_queue(self) |
| |
| |
| |
| |
| .. method:: terminate(self) |
| |
| Termninates the watcher. |
| |
| |
| |
| |
| .. py:class:: KubernetesExecutor |
| |
| Bases: :class:`airflow.executors.base_executor.BaseExecutor`, :class:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| Executor for Kubernetes |
| |
| |
| .. method:: clear_not_launched_queued_tasks(self, session=None) |
| |
| If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or |
| may not |
| have been launched Thus, on starting up the scheduler let's check every |
| "Queued" task to |
| see if it has been launched (ie: if there is a corresponding pod on kubernetes) |
| |
| If it has been launched then do nothing, otherwise reset the state to "None" so |
| the task |
| will be rescheduled |
| |
| This will not be necessary in a future version of airflow in which there is |
| proper support |
| for State.LAUNCHED |
| |
| |
| |
| |
| .. method:: _inject_secrets(self) |
| |
| |
| |
| |
| .. method:: start(self) |
| |
| Starts the executor |
| |
| |
| |
| |
| .. method:: execute_async(self, key, command, queue=None, executor_config=None) |
| |
| Executes task asynchronously |
| |
| |
| |
| |
| .. method:: sync(self) |
| |
| Synchronize task state. |
| |
| |
| |
| |
| .. method:: _change_state(self, key, state, pod_id, namespace) |
| |
| |
| |
| |
| .. method:: _flush_task_queue(self) |
| |
| |
| |
| |
| .. method:: _flush_result_queue(self) |
| |
| |
| |
| |
| .. method:: end(self) |
| |
| Called when the executor shuts down |
| |
| |
| |
| |