blob: b935463fb2f7b63668a911c42c5684ed642049cf [file] [log] [blame]
:py:mod:`airflow.executors.celery_executor`
===========================================
.. py:module:: airflow.executors.celery_executor
.. autoapi-nested-parse::
CeleryExecutor
.. seealso::
For more information on how the CeleryExecutor works, take a look at the guide:
:ref:`executor:CeleryExecutor`
Module Contents
---------------
Classes
~~~~~~~
.. autoapisummary::
airflow.executors.celery_executor.ExceptionWithTraceback
airflow.executors.celery_executor.CeleryExecutor
airflow.executors.celery_executor.BulkStateFetcher
Functions
~~~~~~~~~
.. autoapisummary::
airflow.executors.celery_executor.execute_command
airflow.executors.celery_executor.send_task_to_executor
airflow.executors.celery_executor.on_celery_import_modules
airflow.executors.celery_executor.fetch_celery_task_state
Attributes
~~~~~~~~~~
.. autoapisummary::
airflow.executors.celery_executor.log
airflow.executors.celery_executor.CELERY_FETCH_ERR_MSG_HEADER
airflow.executors.celery_executor.CELERY_SEND_ERR_MSG_HEADER
airflow.executors.celery_executor.OPERATION_TIMEOUT
airflow.executors.celery_executor.celery_configuration
airflow.executors.celery_executor.app
airflow.executors.celery_executor.TaskInstanceInCelery
.. py:data:: log
.. py:data:: CELERY_FETCH_ERR_MSG_HEADER
:annotation: = Error fetching Celery task state
.. py:data:: CELERY_SEND_ERR_MSG_HEADER
:annotation: = Error sending Celery task
.. py:data:: OPERATION_TIMEOUT
To start the celery worker, run the command:
airflow celery worker
.. py:data:: celery_configuration
.. py:data:: app
.. py:function:: execute_command(command_to_exec: airflow.executors.base_executor.CommandType) -> None
Executes command.
.. py:class:: ExceptionWithTraceback(exception: Exception, exception_traceback: str)
Wrapper class used to propagate exceptions to parent processes from subprocesses.
:param exception: The exception to wrap
:type exception: Exception
:param exception_traceback: The stacktrace to wrap
:type exception_traceback: str
.. py:data:: TaskInstanceInCelery
.. py:function:: send_task_to_executor(task_tuple: TaskInstanceInCelery) -> Tuple[airflow.models.taskinstance.TaskInstanceKey, airflow.executors.base_executor.CommandType, Union[celery.result.AsyncResult, ExceptionWithTraceback]]
Sends task to executor.
.. py:function:: on_celery_import_modules(*args, **kwargs)
Preload some "expensive" airflow modules so that every task process doesn't have to import it again and
again.
Loading these for each task adds 0.3-0.5s *per task* before the task can run. For long running tasks this
doesn't matter, but for short tasks this starts to be a noticeable impact.
.. py:class:: CeleryExecutor
Bases: :py:obj:`airflow.executors.base_executor.BaseExecutor`
CeleryExecutor is recommended for production use of Airflow. It allows
distributing the execution of task instances to multiple worker nodes.
Celery is a simple, flexible and reliable distributed system to process
vast amounts of messages, while providing operations with the tools
required to maintain such a system.
.. py:method:: start(self) -> None
Executors may need to get things started.
.. py:method:: trigger_tasks(self, open_slots: int) -> None
Overwrite trigger_tasks function from BaseExecutor
:param open_slots: Number of open slots
:return:
.. py:method:: sync(self) -> None
Sync will get called periodically by the heartbeat method.
Executors should override this to perform gather statuses.
.. py:method:: debug_dump(self) -> None
Called in response to SIGUSR2 by the scheduler
.. py:method:: update_all_task_states(self) -> None
Updates states of the tasks.
.. py:method:: change_state(self, key: airflow.models.taskinstance.TaskInstanceKey, state: str, info=None) -> None
Changes state of the task.
:param info: Executor information for the task instance
:param key: Unique key for the task instance
:param state: State to set for the task.
.. py:method:: update_task_state(self, key: airflow.models.taskinstance.TaskInstanceKey, state: str, info: Any) -> None
Updates state of a single task.
.. py:method:: end(self, synchronous: bool = False) -> None
This method is called when the caller is done submitting job and
wants to wait synchronously for the job submitted previously to be
all done.
.. py:method:: execute_async(self, key: airflow.models.taskinstance.TaskInstanceKey, command: airflow.executors.base_executor.CommandType, queue: Optional[str] = None, executor_config: Optional[Any] = None)
Do not allow async execution for Celery executor.
.. py:method:: terminate(self)
This method is called when the daemon receives a SIGTERM
.. py:method:: try_adopt_task_instances(self, tis: List[airflow.models.taskinstance.TaskInstance]) -> List[airflow.models.taskinstance.TaskInstance]
Try to adopt running task instances that have been abandoned by a SchedulerJob dying.
Anything that is not adopted will be cleared by the scheduler (and then become eligible for
re-scheduling)
:return: any TaskInstances that were unable to be adopted
:rtype: list[airflow.models.TaskInstance]
.. py:function:: fetch_celery_task_state(async_result: celery.result.AsyncResult) -> Tuple[str, Union[str, ExceptionWithTraceback], Any]
Fetch and return the state of the given celery task. The scope of this function is
global so that it can be called by subprocesses in the pool.
:param async_result: a tuple of the Celery task key and the async Celery object used
to fetch the task's state
:type async_result: tuple(str, celery.result.AsyncResult)
:return: a tuple of the Celery task key and the Celery state and the celery info
of the task
:rtype: tuple[str, str, str]
.. py:class:: BulkStateFetcher(sync_parallelism=None)
Bases: :py:obj:`airflow.utils.log.logging_mixin.LoggingMixin`
Gets status for many Celery tasks using the best method available
If BaseKeyValueStoreBackend is used as result backend, the mget method is used.
If DatabaseBackend is used as result backend, the SELECT ...WHERE task_id IN (...) query is used
Otherwise, multiprocessing.Pool will be used. Each task status will be downloaded individually.
.. py:method:: get_many(self, async_results) -> Mapping[str, airflow.executors.base_executor.EventBufferValueType]
Gets status for many Celery tasks using the best method available.