blob: 62a325d510e29e72595f892eb92a9709c44bd920 [file] [log] [blame]
:mod:`airflow.executors.kubernetes_executor`
============================================
.. py:module:: airflow.executors.kubernetes_executor
.. autoapi-nested-parse::
KubernetesExecutor
.. seealso::
For more information on how the KubernetesExecutor works, take a look at the guide:
:ref:`executor:KubernetesExecutor`
Module Contents
---------------
.. data:: KubernetesJobType
.. data:: KubernetesResultsType
.. data:: KubernetesWatchType
.. py:class:: ResourceVersion
Singleton for tracking resourceVersion from Kubernetes
.. attribute:: _instance
.. attribute:: resource_version
:annotation: = 0
.. classmethod:: __new__(cls)
.. py:class:: KubernetesJobWatcher(namespace: Optional[str], multi_namespace_mode: bool, watcher_queue: 'Queue[KubernetesWatchType]', resource_version: Optional[str], scheduler_job_id: Optional[str], kube_config: Configuration)
Bases: :class:`multiprocessing.Process`, :class:`airflow.utils.log.logging_mixin.LoggingMixin`
Watches for Kubernetes jobs
.. method:: run(self)
Performs watching
.. method:: _run(self, kube_client: client.CoreV1Api, resource_version: Optional[str], scheduler_job_id: str, kube_config: Any)
.. method:: process_error(self, event: Any)
Process error response
.. method:: process_status(self, pod_id: str, namespace: str, status: str, annotations: Dict[str, str], resource_version: str, event: Any)
Process status response
.. py:class:: AirflowKubernetesScheduler(kube_config: Any, task_queue: 'Queue[KubernetesJobType]', result_queue: 'Queue[KubernetesResultsType]', kube_client: client.CoreV1Api, scheduler_job_id: str)
Bases: :class:`airflow.utils.log.logging_mixin.LoggingMixin`
Airflow Scheduler for Kubernetes
.. method:: _make_kube_watcher(self)
.. method:: _health_check_kube_watcher(self)
.. method:: run_next(self, next_job: KubernetesJobType)
The run_next command will check the task_queue for any un-run jobs.
It will then create a unique job-id, launch that job in the cluster,
and store relevant info in the current_jobs map so we can track the job's
status
.. method:: delete_pod(self, pod_id: str, namespace: str)
Deletes POD
.. method:: sync(self)
The sync function checks the status of all currently running kubernetes jobs.
If a job is completed, its status is placed in the result queue to
be sent back to the scheduler.
:return:
.. method:: process_watcher_task(self, task: KubernetesWatchType)
Process the task by watcher.
.. method:: _annotations_to_key(self, annotations: Dict[str, str])
.. staticmethod:: _make_safe_pod_id(safe_dag_id: str, safe_task_id: str, safe_uuid: str)
Kubernetes pod names must be <= 253 chars and must pass the following regex for
validation
``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
:param safe_dag_id: a dag_id with only alphanumeric characters
:param safe_task_id: a task_id with only alphanumeric characters
:param safe_uuid: a uuid
:return: ``str`` valid Pod name of appropriate length
.. method:: _flush_watcher_queue(self)
.. method:: terminate(self)
Terminates the watcher.
.. function:: get_base_pod_from_template(pod_template_file: Optional[str], kube_config: Any) -> k8s.V1Pod
Reads either the pod_template_file set in the executor_config or the base pod_template_file
set in the airflow.cfg to craft a "base pod" that will be used by the KubernetesExecutor
:param pod_template_file: absolute path to a pod_template_file.yaml or None
:param kube_config: The KubeConfig class generated by airflow that contains all kube metadata
:return: a V1Pod that can be used as the base pod for k8s tasks
.. py:class:: KubernetesExecutor
Bases: :class:`airflow.executors.base_executor.BaseExecutor`, :class:`airflow.utils.log.logging_mixin.LoggingMixin`
Executor for Kubernetes
.. method:: clear_not_launched_queued_tasks(self, session=None)
If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or
may not
have been launched. Thus on starting up the scheduler let's check every
"Queued" task to
see if it has been launched (ie: if there is a corresponding pod on kubernetes)
If it has been launched then do nothing, otherwise reset the state to "None" so
the task
will be rescheduled
This will not be necessary in a future version of airflow in which there is
proper support
for State.LAUNCHED
.. method:: start(self)
Starts the executor
.. method:: execute_async(self, key: TaskInstanceKey, command: CommandType, queue: Optional[str] = None, executor_config: Optional[Any] = None)
Executes task asynchronously
.. method:: sync(self)
Synchronize task state.
.. method:: _change_state(self, key: TaskInstanceKey, state: Optional[str], pod_id: str, namespace: str)
.. method:: try_adopt_task_instances(self, tis: List[TaskInstance])
.. method:: adopt_launched_task(self, kube_client, pod, pod_ids: dict)
Patch existing pod so that the current KubernetesJobWatcher can monitor it via label selectors
:param kube_client: kubernetes client for speaking to kube API
:param pod: V1Pod spec that we will patch with new label
:param pod_ids: pod_ids we expect to patch.
.. method:: _adopt_completed_pods(self, kube_client: kubernetes.client.CoreV1Api)
Patch completed pod so that the KubernetesJobWatcher can delete it.
:param kube_client: kubernetes client for speaking to kube API
.. method:: _flush_task_queue(self)
.. method:: _flush_result_queue(self)
.. method:: end(self)
Called when the executor shuts down
.. method:: terminate(self)
Terminate the executor is not doing anything.