| |
| |
| :mod:`airflow.contrib.executors.kubernetes_executor` |
| ==================================================== |
| |
| .. py:module:: airflow.contrib.executors.kubernetes_executor |
| |
| |
| |
| |
| |
| |
| |
| Module Contents |
| --------------- |
| |
| |
| |
| |
| |
| |
| .. py:class:: KubernetesExecutorConfig(image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, gcp_service_account_key=None, node_selectors=None, affinity=None, annotations=None, volumes=None, volume_mounts=None, tolerations=None) |
| |
| |
| |
| |
| |
| .. method:: __repr__(self) |
| |
| |
| |
| |
| |
| |
| |
| .. staticmethod:: from_dict(obj) |
| |
| |
| |
| |
| |
| |
| |
| .. method:: as_dict(self) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| .. py:class:: KubeConfig |
| |
| |
| |
| .. attribute:: core_section |
| :annotation: = core |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| .. attribute:: kubernetes_section |
| :annotation: = kubernetes |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| .. method:: _validate(self) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| .. py:class:: KubernetesJobWatcher(namespace, watcher_queue, resource_version, worker_uuid) |
| |
| Bases::class:`multiprocessing.Process`, :class:`airflow.utils.log.logging_mixin.LoggingMixin`, :class:`object` |
| |
| |
| |
| |
| |
| |
| |
| .. method:: run(self) |
| |
| |
| |
| |
| |
| |
| |
| .. method:: _run(self, kube_client, resource_version, worker_uuid) |
| |
| |
| |
| |
| |
| |
| |
| .. method:: process_error(self, event) |
| |
| |
| |
| |
| |
| |
| |
| .. method:: process_status(self, pod_id, status, labels, resource_version) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| .. py:class:: AirflowKubernetesScheduler(kube_config, task_queue, result_queue, kube_client, worker_uuid) |
| |
| Bases::class:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| |
| |
| |
| |
| |
| |
| .. method:: _make_kube_watcher(self) |
| |
| |
| |
| |
| |
| |
| |
| .. method:: _health_check_kube_watcher(self) |
| |
| |
| |
| |
| |
| |
| |
| .. method:: run_next(self, next_job) |
| |
| |
| The run_next command will check the task_queue for any un-run jobs. |
| It will then create a unique job-id, launch that job in the cluster, |
| and store relevant info in the current_jobs map so we can track the job's |
| status |
| |
| |
| |
| |
| |
| |
| |
| .. method:: delete_pod(self, pod_id) |
| |
| |
| |
| |
| |
| |
| |
| .. method:: sync(self) |
| |
| |
| The sync function checks the status of all currently running kubernetes jobs. |
| If a job is completed, it's status is placed in the result queue to |
| be sent back to the scheduler. |
| |
| :return: |
| |
| |
| |
| |
| |
| |
| |
| .. method:: process_watcher_task(self) |
| |
| |
| |
| |
| |
| |
| |
| .. staticmethod:: _strip_unsafe_kubernetes_special_chars(string) |
| |
| |
| Kubernetes only supports lowercase alphanumeric characters and "-" and "." in |
| the pod name |
| However, there are special rules about how "-" and "." can be used so let's |
| only keep |
| alphanumeric chars see here for detail: |
| https://kubernetes.io/docs/concepts/overview/working-with-objects/names/ |
| |
| :param string: The requested Pod name |
| :return: ``str`` Pod name stripped of any unsafe characters |
| |
| |
| |
| |
| |
| |
| |
| .. staticmethod:: _make_safe_pod_id(safe_dag_id, safe_task_id, safe_uuid) |
| |
| |
| Kubernetes pod names must be <= 253 chars and must pass the following regex for |
| validation |
| "^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$" |
| |
| :param safe_dag_id: a dag_id with only alphanumeric characters |
| :param safe_task_id: a task_id with only alphanumeric characters |
| :param random_uuid: a uuid |
| :return: ``str`` valid Pod name of appropriate length |
| |
| |
| |
| |
| |
| |
| |
| .. staticmethod:: _make_safe_label_value(string) |
| |
| |
| Valid label values must be 63 characters or less and must be empty or begin and |
| end with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_), |
| dots (.), and alphanumerics between. |
| |
| If the label value is then greater than 63 chars once made safe, or differs in any |
| way from the original value sent to this function, then we need to truncate to |
| 53chars, and append it with a unique hash. |
| |
| |
| |
| |
| |
| |
| |
| .. staticmethod:: _create_pod_id(dag_id, task_id) |
| |
| |
| |
| |
| |
| |
| |
| .. staticmethod:: _label_safe_datestring_to_datetime(string) |
| |
| |
| Kubernetes doesn't permit ":" in labels. ISO datetime format uses ":" but not |
| "_", let's |
| replace ":" with "_" |
| |
| :param string: str |
| :return: datetime.datetime object |
| |
| |
| |
| |
| |
| |
| |
| .. staticmethod:: _datetime_to_label_safe_datestring(datetime_obj) |
| |
| |
| Kubernetes doesn't like ":" in labels, since ISO datetime format uses ":" but |
| not "_" let's |
| replace ":" with "_" |
| :param datetime_obj: datetime.datetime object |
| :return: ISO-like string representing the datetime |
| |
| |
| |
| |
| |
| |
| |
| .. method:: _labels_to_key(self, labels) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| .. py:class:: KubernetesExecutor |
| |
| Bases::class:`airflow.executors.base_executor.BaseExecutor`, :class:`airflow.utils.log.logging_mixin.LoggingMixin` |
| |
| |
| |
| |
| |
| |
| |
| .. method:: clear_not_launched_queued_tasks(self, session=None) |
| |
| |
| If the airflow scheduler restarts with pending "Queued" tasks, the tasks may or |
| may not |
| have been launched Thus, on starting up the scheduler let's check every |
| "Queued" task to |
| see if it has been launched (ie: if there is a corresponding pod on kubernetes) |
| |
| If it has been launched then do nothing, otherwise reset the state to "None" so |
| the task |
| will be rescheduled |
| |
| This will not be necessary in a future version of airflow in which there is |
| proper support |
| for State.LAUNCHED |
| |
| |
| |
| |
| |
| |
| |
| .. method:: _inject_secrets(self) |
| |
| |
| |
| |
| |
| |
| |
| .. method:: start(self) |
| |
| |
| |
| |
| |
| |
| |
| .. method:: execute_async(self, key, command, queue=None, executor_config=None) |
| |
| |
| |
| |
| |
| |
| |
| .. method:: sync(self) |
| |
| |
| |
| |
| |
| |
| |
| .. method:: _change_state(self, key, state, pod_id) |
| |
| |
| |
| |
| |
| |
| |
| .. method:: end(self) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |