[AIRFLOW-6620] Mock celery in worker cli test (#7243)

Actual tests start celery worker and doesn't stop it. Sometimes
it result in timeout of a test.
diff --git a/tests/cli/commands/test_celery_command.py b/tests/cli/commands/test_celery_command.py
index ee78fc2..9f8cdfb 100644
--- a/tests/cli/commands/test_celery_command.py
+++ b/tests/cli/commands/test_celery_command.py
@@ -27,6 +27,7 @@
 import airflow
 from airflow.bin import cli
 from airflow.cli.commands import celery_command
+from airflow.configuration import conf
 from tests.test_utils.config import conf_vars
 
 mock.patch('airflow.utils.cli.action_logging', lambda x: x).start()
@@ -42,7 +43,6 @@
         """
         mock_validate_session.return_value = False
         with self.assertRaises(SystemExit) as cm:
-            # airflow.bin.cli.worker(mock_args)
             celery_command.worker(mock_args)
         self.assertEqual(cm.exception.code, 1)
 
@@ -77,18 +77,20 @@
     def tearDown(self):
         importlib.reload(cli)
 
-    def test_serve_logs_on_worker_start(self):
+    @mock.patch('airflow.cli.commands.celery_command.worker_bin')
+    def test_serve_logs_on_worker_start(self, mock_worker):
         with mock.patch('airflow.cli.commands.celery_command.Process') as mock_process:
-            args = self.parser.parse_args(['celery', 'worker', '-c', '-1'])
+            args = self.parser.parse_args(['celery', 'worker', '-c', '1'])
 
             with mock.patch('celery.platforms.check_privileges') as mock_privil:
                 mock_privil.return_value = 0
                 celery_command.worker(args)
                 mock_process.assert_called()
 
-    def test_skip_serve_logs_on_worker_start(self):
+    @mock.patch('airflow.cli.commands.celery_command.worker_bin')
+    def test_skip_serve_logs_on_worker_start(self, mock_worker):
         with mock.patch('airflow.cli.commands.celery_command.Process') as mock_popen:
-            args = self.parser.parse_args(['celery', 'worker', '-c', '-1', '-s'])
+            args = self.parser.parse_args(['celery', 'worker', '-c', '1', '-s'])
 
             with mock.patch('celery.platforms.check_privileges') as mock_privil:
                 mock_privil.return_value = 0
@@ -103,6 +105,9 @@
         importlib.reload(cli)
         cls.parser = cli.CLIFactory.get_parser()
 
+    def tearDown(self):
+        importlib.reload(cli)
+
     @mock.patch("airflow.cli.commands.celery_command.setup_locations")
     @mock.patch("airflow.cli.commands.celery_command.psutil.Process")
     def test_if_right_pid_is_read(self, mock_process, mock_setup_locations):
@@ -147,3 +152,53 @@
         stop_args = self.parser.parse_args(['celery', 'stop'])
         celery_command.stop_worker(stop_args)
         mock_read_pid_from_pidfile.assert_called_once_with(pid_file)
+
+
+class TestWorkerStart(unittest.TestCase):
+    @classmethod
+    @conf_vars({("core", "executor"): "CeleryExecutor"})
+    def setUpClass(cls):
+        importlib.reload(cli)
+        cls.parser = cli.CLIFactory.get_parser()
+
+    def tearDown(self):
+        importlib.reload(cli)
+
+    @mock.patch("airflow.cli.commands.celery_command.setup_locations")
+    @mock.patch('airflow.cli.commands.celery_command.Process')
+    @mock.patch('airflow.cli.commands.celery_command.worker_bin')
+    def test_worker_started_with_required_arguments(self, mock_worker, mock_popen, mock_locations):
+        pid_file = "pid_file"
+        mock_locations.return_value = (pid_file, None, None, None)
+        concurrency = '1'
+        celery_hostname = "celery_hostname"
+        queues = "queue"
+        autoscale = "2,5"
+        args = self.parser.parse_args([
+            'celery',
+            'worker',
+            '--autoscale',
+            autoscale,
+            '--concurrency',
+            concurrency,
+            '--celery_hostname',
+            celery_hostname,
+            '--queues',
+            queues
+        ])
+
+        with mock.patch('celery.platforms.check_privileges') as mock_privil:
+            mock_privil.return_value = 0
+            celery_command.worker(args)
+
+        mock_worker.worker.return_value.run.assert_called_once_with(
+            pool='prefork',
+            optimization='fair',
+            O='fair',  # noqa
+            queues=queues,
+            pidfile=pid_file,
+            concurrency=int(concurrency),
+            autoscale=autoscale,
+            hostname=celery_hostname,
+            loglevel=conf.get('logging', 'LOGGING_LEVEL'),
+        )