blob: e9f77528306847266a5dd9cbba44d97d7a87838b [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Celery command"""
import os
import signal
import sys
from multiprocessing import Process
from typing import Optional
import daemon
import psutil
from celery.bin import worker as worker_bin
from daemon.pidfile import TimeoutPIDLockFile
from lockfile.pidlockfile import read_pid_from_pidfile, remove_existing_pidfile
from airflow import settings
from airflow.configuration import conf
from airflow.executors.celery_executor import app as celery_app
from airflow.utils import cli as cli_utils
from airflow.utils.cli import setup_locations, setup_logging, sigint_handler
from airflow.utils.serve_logs import serve_logs
WORKER_PROCESS_NAME = "worker"
@cli_utils.action_logging
def flower(args):
"""Starts Flower, Celery monitoring tool"""
broker = conf.get('celery', 'BROKER_URL')
address = '--address={}'.format(args.hostname)
port = '--port={}'.format(args.port)
api = '' # pylint: disable=redefined-outer-name
if args.broker_api:
api = '--broker_api=' + args.broker_api
url_prefix = ''
if args.url_prefix:
url_prefix = '--url-prefix=' + args.url_prefix
basic_auth = ''
if args.basic_auth:
basic_auth = '--basic_auth=' + args.basic_auth
flower_conf = ''
if args.flower_conf:
flower_conf = '--conf=' + args.flower_conf
if args.daemon:
pid, stdout, stderr, _ = setup_locations("flower", args.pid, args.stdout, args.stderr, args.log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')
ctx = daemon.DaemonContext(
pidfile=TimeoutPIDLockFile(pid, -1),
stdout=stdout,
stderr=stderr,
)
with ctx:
os.execvp("flower", ['flower', '-b',
broker, address, port, api, flower_conf, url_prefix, basic_auth])
stdout.close()
stderr.close()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
os.execvp("flower", ['flower', '-b',
broker, address, port, api, flower_conf, url_prefix, basic_auth])
def _serve_logs(skip_serve_logs: bool = False) -> Optional[Process]:
"""Starts serve_logs sub-process"""
if skip_serve_logs is False:
sub_proc = Process(target=serve_logs)
sub_proc.start()
return sub_proc
return None
@cli_utils.action_logging
def worker(args):
"""Starts Airflow Celery worker"""
if not settings.validate_session():
print("Worker exiting... database connection precheck failed! ")
sys.exit(1)
autoscale = args.autoscale
skip_serve_logs = args.skip_serve_logs
if autoscale is None and conf.has_option("celery", "worker_autoscale"):
autoscale = conf.get("celery", "worker_autoscale")
# Setup locations
pid_file_path, stdout, stderr, log_file = setup_locations(
process=WORKER_PROCESS_NAME,
pid=args.pid,
stdout=args.stdout,
stderr=args.stderr,
log=args.log_file,
)
# Setup Celery worker
worker_instance = worker_bin.worker(app=celery_app)
options = {
'optimization': 'fair',
'O': 'fair',
'queues': args.queues,
'concurrency': args.concurrency,
'autoscale': autoscale,
'hostname': args.celery_hostname,
'loglevel': conf.get('logging', 'LOGGING_LEVEL'),
'pidfile': pid_file_path,
}
if conf.has_option("celery", "pool"):
options["pool"] = conf.get("celery", "pool")
if args.daemon:
# Run Celery worker as daemon
handle = setup_logging(log_file)
stdout = open(stdout, 'w+')
stderr = open(stderr, 'w+')
ctx = daemon.DaemonContext(
files_preserve=[handle],
stdout=stdout,
stderr=stderr,
)
with ctx:
sub_proc = _serve_logs(skip_serve_logs)
worker_instance.run(**options)
stdout.close()
stderr.close()
else:
# Run Celery worker in the same process
sub_proc = _serve_logs(skip_serve_logs)
worker_instance.run(**options)
if sub_proc:
sub_proc.terminate()
@cli_utils.action_logging
def stop_worker(args): # pylint: disable=unused-argument
"""Sends SIGTERM to Celery worker"""
# Read PID from file
pid_file_path, _, _, _ = setup_locations(process=WORKER_PROCESS_NAME)
pid = read_pid_from_pidfile(pid_file_path)
# Send SIGTERM
if pid:
worker_process = psutil.Process(pid)
worker_process.terminate()
# Remove pid file
remove_existing_pidfile(pid_file_path)