blob: 01286ee9a49ee54f5e6179ed36ab073c12421ee8 [file] [log] [blame]
:mod:`airflow.contrib.executors.kubernetes_executor`
====================================================
.. py:module:: airflow.contrib.executors.kubernetes_executor
Module Contents
---------------
.. py:class:: KubernetesExecutorConfig(image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, gcp_service_account_key=None, node_selectors=None, affinity=None, annotations=None, volumes=None, volume_mounts=None, tolerations=None)
.. method:: __repr__(self)
.. staticmethod:: from_dict(obj)
.. method:: as_dict(self)
.. py:class:: KubeConfig
.. attribute:: core_section
:annotation: = core
.. attribute:: kubernetes_section
:annotation: = kubernetes
.. method:: _validate(self)
.. py:class:: KubernetesJobWatcher(namespace, watcher_queue, resource_version, worker_uuid)
Bases::class:`multiprocessing.Process`, :class:`airflow.utils.log.logging_mixin.LoggingMixin`, :class:`object`
.. method:: run(self)
.. method:: _run(self, kube_client, resource_version, worker_uuid)
.. method:: process_error(self, event)
.. method:: process_status(self, pod_id, status, labels, resource_version)
.. py:class:: AirflowKubernetesScheduler(kube_config, task_queue, result_queue, kube_client, worker_uuid)
Bases::class:`airflow.utils.log.logging_mixin.LoggingMixin`
.. method:: _make_kube_watcher(self)
.. method:: _health_check_kube_watcher(self)
.. method:: run_next(self, next_job)
The run_next command will check the task_queue for any un-run jobs.
It will then create a unique job-id, launch that job in the cluster,
and store relevant info in the current_jobs map so we can track the job's
status
.. method:: delete_pod(self, pod_id)
.. method:: sync(self)
The sync function checks the status of all currently running kubernetes jobs.
If a job is completed, it's status is placed in the result queue to
be sent back to the scheduler.
:return:
.. method:: process_watcher_task(self)
.. staticmethod:: _strip_unsafe_kubernetes_special_chars(string)
Kubernetes only supports lowercase alphanumeric characters and "-" and "." in
the pod name
However, there are special rules about how "-" and "." can be used so let's
only keep
alphanumeric chars see here for detail:
https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
:param string: The requested Pod name
:return: ``str`` Pod name stripped of any unsafe characters
.. staticmethod:: _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid)
Kubernetes pod names must be <= 253 chars and must pass the following regex for
validation
"^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$"
:param safe_dag_id: a dag_id with only alphanumeric characters
:param safe_task_id: a task_id with only alphanumeric characters
:param random_uuid: a uuid
:return: ``str`` valid Pod name of appropriate length
.. staticmethod:: _make_safe_label_value(string)
Valid label values must be 63 characters or less and must be empty or begin and
end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),
dots (.), and alphanumerics between.
If the label value is then greater than 63 chars once made safe, or differs in any
way from the original value sent to this function, then we need to truncate to
53chars, and append it with a unique hash.
.. staticmethod:: _create_pod_id(dag_id, task_id)
.. staticmethod:: _label_safe_datestring_to_datetime(string)
Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not
"_", let's
replace ":" with "_"
:param string: str
:return: datetime.datetime object
.. staticmethod:: _datetime_to_label_safe_datestring(datetime_obj)
Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but
not "_" let's
replace ":" with "_"
:param datetime_obj: datetime.datetime object
:return: ISO-like string representing the datetime
.. method:: _labels_to_key(self, labels)
.. py:class:: KubernetesExecutor
Bases::class:`airflow.executors.base_executor.BaseExecutor`, :class:`airflow.utils.log.logging_mixin.LoggingMixin`
.. method:: clear_not_launched_queued_tasks(self, session=None)
If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or
may not
have been launched Thus, on starting up the scheduler let's check every
"Queued" task to
see if it has been launched (ie: if there is a corresponding pod on kubernetes)
If it has been launched then do nothing, otherwise reset the state to "None" so
the task
will be rescheduled
This will not be necessary in a future version of airflow in which there is
proper support
for State.LAUNCHED
.. method:: _inject_secrets(self)
.. method:: start(self)
.. method:: execute_async(self, key, command, queue=None, executor_config=None)
.. method:: sync(self)
.. method:: _change_state(self, key, state, pod_id)
.. method:: end(self)