blob: 3b0d03308d13d2949835185499d9826b8938b54b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
---
package-name: apache-airflow-providers-celery
name: Celery
description: |
`Celery <https://docs.celeryq.dev/en/stable/>`__
state: ready
source-date-epoch: 1715384414
# note that those versions are maintained by release manager - do not update them manually
versions:
- 3.7.0
- 3.6.2
- 3.6.1
- 3.6.0
- 3.5.2
- 3.5.1
- 3.5.0
- 3.4.1
- 3.4.0
- 3.3.4
- 3.3.3
- 3.3.2
- 3.3.1
- 3.3.0
- 3.2.1
- 3.2.0
- 3.1.0
- 3.0.0
- 2.1.4
- 2.1.3
- 2.1.2
- 2.1.1
- 2.1.0
- 2.0.0
- 1.0.1
- 1.0.0
dependencies:
- apache-airflow>=2.7.0
# The Celery is known to introduce problems when upgraded to a MAJOR version. Airflow Core
# Uses Celery for CeleryExecutor, and we also know that Kubernetes Python client follows SemVer
# (https://docs.celeryq.dev/en/stable/contributing.html?highlight=semver#versions).
# Make sure that the limit here is synchronized with [celery] extra in the airflow core
# The 5.3.3/5.3.2 limit comes from https://github.com/celery/celery/issues/8470
- celery[redis]>=5.3.0,<6,!=5.3.3,!=5.3.2
- flower>=1.0.0
- google-re2>=1.0
additional-extras:
- name: cncf.kubernetes
dependencies:
- apache-airflow-providers-cncf-kubernetes>=7.4.0
integrations:
- integration-name: Celery
external-doc-url: https://docs.celeryq.dev/en/stable/
logo: /integration-logos/celery/Celery.png
tags: [software]
sensors:
- integration-name: Celery
python-modules:
- airflow.providers.celery.sensors.celery_queue
executors:
- airflow.providers.celery.executors.celery_executor.CeleryExecutor
- airflow.providers.celery.executors.celery_kubernetes_executor.CeleryKubernetesExecutor
config:
celery_kubernetes_executor:
description: |
This section only applies if you are using the ``CeleryKubernetesExecutor`` in
``[core]`` section above
options:
kubernetes_queue:
description: |
Define when to send a task to ``KubernetesExecutor`` when using ``CeleryKubernetesExecutor``.
When the queue of a task is the value of ``kubernetes_queue`` (default ``kubernetes``),
the task is executed via ``KubernetesExecutor``,
otherwise via ``CeleryExecutor``
version_added: ~
type: string
example: ~
default: "kubernetes"
celery:
description: |
This section only applies if you are using the CeleryExecutor in
``[core]`` section above
options:
celery_app_name:
description: |
The app name that will be used by celery
version_added: ~
type: string
example: ~
default: "airflow.providers.celery.executors.celery_executor"
worker_concurrency:
description: |
The concurrency that will be used when starting workers with the
``airflow celery worker`` command. This defines the number of task instances that
a worker will take, so size up your workers based on the resources on
your worker box and the nature of your tasks
version_added: ~
type: string
example: ~
default: "16"
worker_autoscale:
description: |
The maximum and minimum number of pool processes that will be used to dynamically resize
the pool based on load.Enable autoscaling by providing max_concurrency,min_concurrency
with the ``airflow celery worker`` command (always keep minimum processes,
but grow to maximum if necessary).
Pick these numbers based on resources on worker box and the nature of the task.
If autoscale option is available, worker_concurrency will be ignored.
https://docs.celeryq.dev/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale
version_added: ~
type: string
example: 16,12
default: ~
worker_prefetch_multiplier:
description: |
Used to increase the number of tasks that a worker prefetches which can improve performance.
The number of processes multiplied by worker_prefetch_multiplier is the number of tasks
that are prefetched by a worker. A value greater than 1 can result in tasks being unnecessarily
blocked if there are multiple workers and one worker prefetches tasks that sit behind long
running tasks while another worker has unutilized processes that are unable to process the already
claimed blocked tasks.
https://docs.celeryq.dev/en/stable/userguide/optimizing.html#prefetch-limits
version_added: ~
type: integer
example: ~
default: "1"
worker_enable_remote_control:
description: |
Specify if remote control of the workers is enabled.
In some cases when the broker does not support remote control, Celery creates lots of
``.*reply-celery-pidbox`` queues. You can prevent this by setting this to false.
However, with this disabled Flower won't work.
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html#broker-overview
version_added: ~
type: boolean
example: ~
default: "true"
broker_url:
description: |
The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
a sqlalchemy database. Refer to the Celery documentation for more information.
version_added: ~
type: string
sensitive: true
example: ~
default: "redis://redis:6379/0"
result_backend:
description: |
The Celery result_backend. When a job finishes, it needs to update the
metadata of the job. Therefore it will post a message on a message bus,
or insert it into a database (depending of the backend)
This status is used by the scheduler to update the state of the task
The use of a database is highly recommended
When not specified, sql_alchemy_conn with a db+ scheme prefix will be used
https://docs.celeryq.dev/en/latest/userguide/configuration.html#task-result-backend-settings
version_added: ~
type: string
sensitive: true
example: "db+postgresql://postgres:airflow@postgres/airflow"
default: ~
result_backend_sqlalchemy_engine_options:
description: |
Optional configuration dictionary to pass to the Celery result backend SQLAlchemy engine.
version_added: ~
type: string
example: '{"pool_recycle": 1800}'
default: ""
flower_host:
description: |
Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
it ``airflow celery flower``. This defines the IP that Celery Flower runs on
version_added: ~
type: string
example: ~
default: "0.0.0.0"
flower_url_prefix:
description: |
The root URL for Flower
version_added: ~
type: string
example: "/flower"
default: ""
flower_port:
description: |
This defines the port that Celery Flower runs on
version_added: ~
type: string
example: ~
default: "5555"
flower_basic_auth:
description: |
Securing Flower with Basic Authentication
Accepts user:password pairs separated by a comma
version_added: ~
type: string
sensitive: true
example: "user1:password1,user2:password2"
default: ""
sync_parallelism:
description: |
How many processes CeleryExecutor uses to sync task state.
0 means to use max(1, number of cores - 1) processes.
version_added: ~
type: string
example: ~
default: "0"
celery_config_options:
description: |
Import path for celery configuration options
version_added: ~
type: string
example: ~
default: "airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG"
ssl_active:
description: ~
version_added: ~
type: string
example: ~
default: "False"
ssl_key:
description: |
Path to the client key.
version_added: ~
type: string
example: ~
default: ""
ssl_cert:
description: |
Path to the client certificate.
version_added: ~
type: string
example: ~
default: ""
ssl_cacert:
description: |
Path to the CA certificate.
version_added: ~
type: string
example: ~
default: ""
pool:
description: |
Celery Pool implementation.
Choices include: ``prefork`` (default), ``eventlet``, ``gevent`` or ``solo``.
See:
https://docs.celeryq.dev/en/latest/userguide/workers.html#concurrency
https://docs.celeryq.dev/en/latest/userguide/concurrency/eventlet.html
version_added: ~
type: string
example: ~
default: "prefork"
operation_timeout:
description: |
The number of seconds to wait before timing out ``send_task_to_executor`` or
``fetch_celery_task_state`` operations.
version_added: ~
type: float
example: ~
default: "1.0"
task_acks_late:
description: |
If an Airflow task's execution time exceeds the visibility_timeout, Celery will re-assign the
task to a Celery worker, even if the original task is still running successfully. The new task
instance then runs concurrently with the original task and the Airflow UI and logs only show an
error message:
'Task Instance Not Running' FAILED: Task is in the running state'
Setting task_acks_late to True will force Celery to wait until a task is finished before a
new task instance is assigned. This effectively overrides the visibility timeout.
See also:
https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late
version_added: 3.6.0
type: boolean
example: "True"
default: "True"
task_track_started:
description: |
Celery task will report its status as 'started' when the task is executed by a worker.
This is used in Airflow to keep track of the running tasks and if a Scheduler is restarted
or run in HA mode, it can adopt the orphan tasks launched by previous SchedulerJob.
version_added: ~
type: boolean
example: ~
default: "True"
task_publish_max_retries:
description: |
The Maximum number of retries for publishing task messages to the broker when failing
due to ``AirflowTaskTimeout`` error before giving up and marking Task as failed.
version_added: ~
type: integer
example: ~
default: "3"
worker_precheck:
description: |
Worker initialisation check to validate Metadata Database connection
version_added: ~
type: string
example: ~
default: "False"
celery_broker_transport_options:
description: |
This section is for specifying options which can be passed to the
underlying celery broker transport. See:
https://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-broker_transport_options
options:
visibility_timeout:
description: |
The visibility timeout defines the number of seconds to wait for the worker
to acknowledge the task before the message is redelivered to another worker.
Make sure to increase the visibility timeout to match the time of the longest
ETA you're planning to use.
visibility_timeout is only supported for Redis and SQS celery brokers.
See:
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout
version_added: ~
type: string
example: "21600"
default: ~
sentinel_kwargs:
description: |
The sentinel_kwargs parameter allows passing additional options to the Sentinel client.
In a typical scenario where Redis Sentinel is used as the broker and Redis servers are
password-protected, the password needs to be passed through this parameter. Although its
type is string, it is required to pass a string that conforms to the dictionary format.
See:
https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration
version_added: ~
type: string
sensitive: true
example: '{"password": "password_for_redis_server"}'
default: ~