blob: 640068e08acfd240398c6f5370359886819ff61b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import contextlib
import io
import unittest
import pytest
from airflow.cli import cli_parser
from airflow.cli.commands import jobs_command
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.utils.session import create_session
from airflow.utils.state import State
from tests.test_utils.db import clear_db_jobs
class TestCliConfigList(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.parser = cli_parser.get_parser()
def setUp(self) -> None:
clear_db_jobs()
self.scheduler_job = None
def tearDown(self) -> None:
if self.scheduler_job and self.scheduler_job.processor_agent:
self.scheduler_job.processor_agent.end()
clear_db_jobs()
def test_should_report_success_for_one_working_scheduler(self):
with create_session() as session:
self.scheduler_job = SchedulerJob()
self.scheduler_job.state = State.RUNNING
session.add(self.scheduler_job)
session.commit()
self.scheduler_job.heartbeat()
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
jobs_command.check(self.parser.parse_args(['jobs', 'check', '--job-type', 'SchedulerJob']))
self.assertIn("Found one alive job.", temp_stdout.getvalue())
def test_should_report_success_for_one_working_scheduler_with_hostname(self):
with create_session() as session:
self.scheduler_job = SchedulerJob()
self.scheduler_job.state = State.RUNNING
self.scheduler_job.hostname = 'HOSTNAME'
session.add(self.scheduler_job)
session.commit()
self.scheduler_job.heartbeat()
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
jobs_command.check(
self.parser.parse_args(
['jobs', 'check', '--job-type', 'SchedulerJob', '--hostname', 'HOSTNAME']
)
)
self.assertIn("Found one alive job.", temp_stdout.getvalue())
def test_should_report_success_for_ha_schedulers(self):
scheduler_jobs = []
with create_session() as session:
for _ in range(3):
scheduler_job = SchedulerJob()
scheduler_job.state = State.RUNNING
session.add(scheduler_job)
scheduler_jobs.append(scheduler_job)
session.commit()
scheduler_job.heartbeat()
with contextlib.redirect_stdout(io.StringIO()) as temp_stdout:
jobs_command.check(
self.parser.parse_args(
['jobs', 'check', '--job-type', 'SchedulerJob', '--limit', '100', '--allow-multiple']
)
)
self.assertIn("Found 3 alive jobs.", temp_stdout.getvalue())
for scheduler_job in scheduler_jobs:
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
def test_should_ignore_not_running_jobs(self):
scheduler_jobs = []
with create_session() as session:
for _ in range(3):
scheduler_job = SchedulerJob()
scheduler_job.state = State.SHUTDOWN
session.add(scheduler_job)
scheduler_jobs.append(scheduler_job)
session.commit()
# No alive jobs found.
with pytest.raises(SystemExit, match=r"No alive jobs found."):
jobs_command.check(self.parser.parse_args(['jobs', 'check']))
for scheduler_job in scheduler_jobs:
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
def test_should_raise_exception_for_multiple_scheduler_on_one_host(self):
scheduler_jobs = []
with create_session() as session:
for _ in range(3):
scheduler_job = SchedulerJob()
scheduler_job.state = State.RUNNING
scheduler_job.hostname = 'HOSTNAME'
session.add(scheduler_job)
session.commit()
scheduler_job.heartbeat()
with pytest.raises(SystemExit, match=r"Found 3 alive jobs. Expected only one."):
jobs_command.check(
self.parser.parse_args(
[
'jobs',
'check',
'--job-type',
'SchedulerJob',
'--limit',
'100',
]
)
)
for scheduler_job in scheduler_jobs:
if scheduler_job.processor_agent:
scheduler_job.processor_agent.end()
def test_should_raise_exception_for_allow_multiple_and_limit_1(self):
with pytest.raises(
SystemExit,
match=r"To use option --allow-multiple, you must set the limit to a value greater than 1.",
):
jobs_command.check(self.parser.parse_args(['jobs', 'check', '--allow-multiple']))