[AIRFLOW-6620] Mock celery in worker cli test (#7243)
Actual tests start celery worker and doesn't stop it. Sometimes
it result in timeout of a test.
diff --git a/tests/cli/commands/test_celery_command.py b/tests/cli/commands/test_celery_command.py
index ee78fc2..9f8cdfb 100644
--- a/tests/cli/commands/test_celery_command.py
+++ b/tests/cli/commands/test_celery_command.py
@@ -27,6 +27,7 @@
import airflow
from airflow.bin import cli
from airflow.cli.commands import celery_command
+from airflow.configuration import conf
from tests.test_utils.config import conf_vars
mock.patch('airflow.utils.cli.action_logging', lambda x: x).start()
@@ -42,7 +43,6 @@
"""
mock_validate_session.return_value = False
with self.assertRaises(SystemExit) as cm:
- # airflow.bin.cli.worker(mock_args)
celery_command.worker(mock_args)
self.assertEqual(cm.exception.code, 1)
@@ -77,18 +77,20 @@
def tearDown(self):
importlib.reload(cli)
- def test_serve_logs_on_worker_start(self):
+ @mock.patch('airflow.cli.commands.celery_command.worker_bin')
+ def test_serve_logs_on_worker_start(self, mock_worker):
with mock.patch('airflow.cli.commands.celery_command.Process') as mock_process:
- args = self.parser.parse_args(['celery', 'worker', '-c', '-1'])
+ args = self.parser.parse_args(['celery', 'worker', '-c', '1'])
with mock.patch('celery.platforms.check_privileges') as mock_privil:
mock_privil.return_value = 0
celery_command.worker(args)
mock_process.assert_called()
- def test_skip_serve_logs_on_worker_start(self):
+ @mock.patch('airflow.cli.commands.celery_command.worker_bin')
+ def test_skip_serve_logs_on_worker_start(self, mock_worker):
with mock.patch('airflow.cli.commands.celery_command.Process') as mock_popen:
- args = self.parser.parse_args(['celery', 'worker', '-c', '-1', '-s'])
+ args = self.parser.parse_args(['celery', 'worker', '-c', '1', '-s'])
with mock.patch('celery.platforms.check_privileges') as mock_privil:
mock_privil.return_value = 0
@@ -103,6 +105,9 @@
importlib.reload(cli)
cls.parser = cli.CLIFactory.get_parser()
+ def tearDown(self):
+ importlib.reload(cli)
+
@mock.patch("airflow.cli.commands.celery_command.setup_locations")
@mock.patch("airflow.cli.commands.celery_command.psutil.Process")
def test_if_right_pid_is_read(self, mock_process, mock_setup_locations):
@@ -147,3 +152,53 @@
stop_args = self.parser.parse_args(['celery', 'stop'])
celery_command.stop_worker(stop_args)
mock_read_pid_from_pidfile.assert_called_once_with(pid_file)
+
+
+class TestWorkerStart(unittest.TestCase):
+ @classmethod
+ @conf_vars({("core", "executor"): "CeleryExecutor"})
+ def setUpClass(cls):
+ importlib.reload(cli)
+ cls.parser = cli.CLIFactory.get_parser()
+
+ def tearDown(self):
+ importlib.reload(cli)
+
+ @mock.patch("airflow.cli.commands.celery_command.setup_locations")
+ @mock.patch('airflow.cli.commands.celery_command.Process')
+ @mock.patch('airflow.cli.commands.celery_command.worker_bin')
+ def test_worker_started_with_required_arguments(self, mock_worker, mock_popen, mock_locations):
+ pid_file = "pid_file"
+ mock_locations.return_value = (pid_file, None, None, None)
+ concurrency = '1'
+ celery_hostname = "celery_hostname"
+ queues = "queue"
+ autoscale = "2,5"
+ args = self.parser.parse_args([
+ 'celery',
+ 'worker',
+ '--autoscale',
+ autoscale,
+ '--concurrency',
+ concurrency,
+ '--celery_hostname',
+ celery_hostname,
+ '--queues',
+ queues
+ ])
+
+ with mock.patch('celery.platforms.check_privileges') as mock_privil:
+ mock_privil.return_value = 0
+ celery_command.worker(args)
+
+ mock_worker.worker.return_value.run.assert_called_once_with(
+ pool='prefork',
+ optimization='fair',
+ O='fair', # noqa
+ queues=queues,
+ pidfile=pid_file,
+ concurrency=int(concurrency),
+ autoscale=autoscale,
+ hostname=celery_hostname,
+ loglevel=conf.get('logging', 'LOGGING_LEVEL'),
+ )