| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import datetime |
| import os |
| import shutil |
| import unittest |
| from datetime import timedelta |
| from tempfile import NamedTemporaryFile, mkdtemp |
| |
| import mock |
| import psutil |
| import pytest |
| import six |
| from freezegun import freeze_time |
| from mock import MagicMock, patch |
| from parameterized import parameterized |
| |
| import airflow.example_dags |
| import airflow.smart_sensor_dags |
| from airflow import settings |
| from airflow.configuration import conf |
| from airflow.exceptions import AirflowException |
| from airflow.executors.base_executor import BaseExecutor |
| from airflow.jobs.backfill_job import BackfillJob |
| from airflow.jobs.scheduler_job import DagFileProcessor, SchedulerJob |
| from airflow.models import DAG, DagBag, DagModel, Pool, SlaMiss, TaskInstance, errors |
| from airflow.models.dagrun import DagRun |
| from airflow.models.taskinstance import SimpleTaskInstance, TaskInstanceKey |
| from airflow.operators.bash import BashOperator |
| from airflow.operators.dummy_operator import DummyOperator |
| from airflow.serialization.serialized_objects import SerializedDAG |
| from airflow.utils import timezone |
| from airflow.utils.dag_processing import FailureCallbackRequest, SimpleDagBag |
| from airflow.utils.dates import days_ago |
| from airflow.utils.file import list_py_file_paths |
| from airflow.utils.session import create_session, provide_session |
| from airflow.utils.state import State |
| from airflow.utils.types import DagRunType |
| from tests.test_utils.asserts import assert_queries_count |
| from tests.test_utils.config import conf_vars, env_vars |
| from tests.test_utils.db import ( |
| clear_db_dags, clear_db_errors, clear_db_jobs, clear_db_pools, clear_db_runs, clear_db_sla_miss, |
| set_default_pool_slots, |
| ) |
| from tests.test_utils.mock_executor import MockExecutor |
| |
| ROOT_FOLDER = os.path.realpath( |
| os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir) |
| ) |
| PERF_DAGS_FOLDER = os.path.join(ROOT_FOLDER, "tests", "utils", "perf", "dags") |
| ELASTIC_DAG_FILE = os.path.join(PERF_DAGS_FOLDER, "elastic_dag.py") |
| |
| TEST_DAG_FOLDER = os.environ['AIRFLOW__CORE__DAGS_FOLDER'] |
| DEFAULT_DATE = timezone.datetime(2016, 1, 1) |
| TRY_NUMBER = 1 |
| # Include the words "airflow" and "dag" in the file contents, |
| # tricking airflow into thinking these |
| # files contain a DAG (otherwise Airflow will skip them) |
| PARSEABLE_DAG_FILE_CONTENTS = '"airflow DAG"' |
| UNPARSEABLE_DAG_FILE_CONTENTS = 'airflow DAG' |
| |
| # Filename to be used for dags that are created in an ad-hoc manner and can be removed/ |
| # created at runtime |
| TEMP_DAG_FILENAME = "temp_dag.py" |
| |
| |
| @pytest.fixture(scope="class") |
| def disable_load_example(): |
| with conf_vars({('core', 'load_examples'): 'false'}): |
| with env_vars({('core', 'load_examples'): 'false'}): |
| yield |
| |
| |
| @pytest.mark.usefixtures("disable_load_example") |
| class TestDagFileProcessor(unittest.TestCase): |
| |
| @staticmethod |
| def clean_db(): |
| clear_db_runs() |
| clear_db_pools() |
| clear_db_dags() |
| clear_db_sla_miss() |
| clear_db_errors() |
| clear_db_jobs() |
| |
| def setUp(self): |
| self.clean_db() |
| |
| # Speed up some tests by not running the tasks, just look at what we |
| # enqueue! |
| self.null_exec = MockExecutor() |
| |
| def tearDown(self) -> None: |
| self.clean_db() |
| |
| def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timedelta(hours=1), **kwargs): |
| dag = DAG( |
| dag_id='test_scheduler_reschedule', |
| start_date=start_date, |
| # Make sure it only creates a single DAG Run |
| end_date=end_date) |
| dag.clear() |
| dag.is_subdag = False |
| with create_session() as session: |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| orm_dag.is_paused = False |
| session.merge(orm_dag) |
| session.commit() |
| return dag |
| |
| @classmethod |
| @patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True) |
| def setUpClass(cls): |
| # Ensure the DAGs we are looking at from the DB are up-to-date |
| non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False) |
| non_serialized_dagbag.sync_to_db() |
| cls.dagbag = DagBag(read_dags_from_db=True) |
| |
| def test_dag_file_processor_sla_miss_callback(self): |
| """ |
| Test that the dag file processor calls the sla miss callback |
| """ |
| session = settings.Session() |
| |
| sla_callback = MagicMock() |
| |
| # Create dag with a start of 1 day ago, but an sla of 0 |
| # so we'll already have an sla_miss on the books. |
| test_start_date = days_ago(1) |
| dag = DAG(dag_id='test_sla_miss', |
| sla_miss_callback=sla_callback, |
| default_args={'start_date': test_start_date, |
| 'sla': datetime.timedelta()}) |
| |
| task = DummyOperator(task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| |
| session.merge(TaskInstance(task=task, execution_date=test_start_date, state='success')) |
| |
| session.merge(SlaMiss(task_id='dummy', dag_id='test_sla_miss', execution_date=test_start_date)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag_file_processor.manage_slas(dag=dag, session=session) |
| |
| assert sla_callback.called |
| |
| def test_dag_file_processor_sla_miss_callback_invalid_sla(self): |
| """ |
| Test that the dag file processor does not call the sla miss callback when |
| given an invalid sla |
| """ |
| session = settings.Session() |
| |
| sla_callback = MagicMock() |
| |
| # Create dag with a start of 1 day ago, but an sla of 0 |
| # so we'll already have an sla_miss on the books. |
| # Pass anything besides a timedelta object to the sla argument. |
| test_start_date = days_ago(1) |
| dag = DAG(dag_id='test_sla_miss', |
| sla_miss_callback=sla_callback, |
| default_args={'start_date': test_start_date, |
| 'sla': None}) |
| |
| task = DummyOperator(task_id='dummy', dag=dag, owner='airflow') |
| |
| session.merge(TaskInstance(task=task, execution_date=test_start_date, state='success')) |
| |
| session.merge(SlaMiss(task_id='dummy', dag_id='test_sla_miss', execution_date=test_start_date)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag_file_processor.manage_slas(dag=dag, session=session) |
| sla_callback.assert_not_called() |
| |
| def test_scheduler_executor_overflow(self): |
| """ |
| Test that tasks that are set back to scheduled and removed from the executor |
| queue in the case of an overflow. |
| """ |
| executor = MockExecutor(do_update=True, parallelism=3) |
| |
| with create_session() as session: |
| dagbag = DagBag(dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py"), |
| include_examples=False, |
| include_smart_sensor=False) |
| dag = self.create_test_dag() |
| dag.clear() |
| dagbag.bag_dag(dag=dag, root_dag=dag) |
| dag = self.create_test_dag() |
| dag.clear() |
| task = DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| tis = [] |
| for i in range(1, 10): |
| ti = TaskInstance(task, DEFAULT_DATE + timedelta(days=i)) |
| ti.state = State.SCHEDULED |
| tis.append(ti) |
| session.merge(ti) |
| |
| # scheduler._process_dags(simple_dag_bag) |
| @mock.patch('airflow.jobs.scheduler_job.DagBag', return_value=dagbag) |
| @mock.patch('airflow.jobs.scheduler_job.SchedulerJob._change_state_for_tis_without_dagrun') |
| def do_schedule(mock_dagbag, mock_change_state): |
| # Use a empty file since the above mock will return the |
| # expected DAGs. Also specify only a single file so that it doesn't |
| # try to schedule the above DAG repeatedly. |
| with conf_vars({('core', 'mp_start_method'): 'fork'}): |
| scheduler = SchedulerJob(num_runs=1, |
| executor=executor, |
| subdir=os.path.join(settings.DAGS_FOLDER, |
| "no_dags.py")) |
| scheduler.heartrate = 0 |
| scheduler.run() |
| |
| do_schedule() # pylint: disable=no-value-for-parameter |
| for ti in tis: |
| ti.refresh_from_db() |
| self.assertEqual(len(executor.queued_tasks), 0) |
| |
| successful_tasks = [ti for ti in tis if ti.state == State.SUCCESS] |
| scheduled_tasks = [ti for ti in tis if ti.state == State.SCHEDULED] |
| self.assertEqual(3, len(successful_tasks)) |
| self.assertEqual(6, len(scheduled_tasks)) |
| |
| def test_dag_file_processor_sla_miss_callback_sent_notification(self): |
| """ |
| Test that the dag file processor does not call the sla_miss_callback when a |
| notification has already been sent |
| """ |
| session = settings.Session() |
| |
| # Mock the callback function so we can verify that it was not called |
| sla_callback = MagicMock() |
| |
| # Create dag with a start of 2 days ago, but an sla of 1 day |
| # ago so we'll already have an sla_miss on the books |
| test_start_date = days_ago(2) |
| dag = DAG(dag_id='test_sla_miss', |
| sla_miss_callback=sla_callback, |
| default_args={'start_date': test_start_date, |
| 'sla': datetime.timedelta(days=1)}) |
| |
| task = DummyOperator(task_id='dummy', dag=dag, owner='airflow') |
| |
| # Create a TaskInstance for two days ago |
| session.merge(TaskInstance(task=task, execution_date=test_start_date, state='success')) |
| |
| # Create an SlaMiss where notification was sent, but email was not |
| session.merge(SlaMiss(task_id='dummy', |
| dag_id='test_sla_miss', |
| execution_date=test_start_date, |
| email_sent=False, |
| notification_sent=True)) |
| |
| # Now call manage_slas and see if the sla_miss callback gets called |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag_file_processor.manage_slas(dag=dag, session=session) |
| |
| sla_callback.assert_not_called() |
| |
| def test_dag_file_processor_sla_miss_callback_exception(self): |
| """ |
| Test that the dag file processor gracefully logs an exception if there is a problem |
| calling the sla_miss_callback |
| """ |
| session = settings.Session() |
| |
| sla_callback = MagicMock(side_effect=RuntimeError('Could not call function')) |
| |
| test_start_date = days_ago(2) |
| dag = DAG(dag_id='test_sla_miss', |
| sla_miss_callback=sla_callback, |
| default_args={'start_date': test_start_date}) |
| |
| task = DummyOperator(task_id='dummy', |
| dag=dag, |
| owner='airflow', |
| sla=datetime.timedelta(hours=1)) |
| |
| session.merge(TaskInstance(task=task, execution_date=test_start_date, state='Success')) |
| |
| # Create an SlaMiss where notification was sent, but email was not |
| session.merge(SlaMiss(task_id='dummy', |
| dag_id='test_sla_miss', |
| execution_date=test_start_date)) |
| |
| # Now call manage_slas and see if the sla_miss callback gets called |
| mock_log = mock.MagicMock() |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock_log) |
| dag_file_processor.manage_slas(dag=dag, session=session) |
| assert sla_callback.called |
| mock_log.exception.assert_called_once_with( |
| 'Could not call sla_miss_callback for DAG %s', |
| 'test_sla_miss') |
| |
| @mock.patch('airflow.jobs.scheduler_job.send_email') |
| def test_dag_file_processor_only_collect_emails_from_sla_missed_tasks(self, mock_send_email): |
| session = settings.Session() |
| |
| test_start_date = days_ago(2) |
| dag = DAG(dag_id='test_sla_miss', |
| default_args={'start_date': test_start_date, |
| 'sla': datetime.timedelta(days=1)}) |
| |
| email1 = 'test1@test.com' |
| task = DummyOperator(task_id='sla_missed', |
| dag=dag, |
| owner='airflow', |
| email=email1, |
| sla=datetime.timedelta(hours=1)) |
| |
| session.merge(TaskInstance(task=task, execution_date=test_start_date, state='Success')) |
| |
| email2 = 'test2@test.com' |
| DummyOperator(task_id='sla_not_missed', |
| dag=dag, |
| owner='airflow', |
| email=email2) |
| |
| session.merge(SlaMiss(task_id='sla_missed', dag_id='test_sla_miss', execution_date=test_start_date)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| |
| dag_file_processor.manage_slas(dag=dag, session=session) |
| |
| self.assertTrue(len(mock_send_email.call_args_list), 1) |
| |
| send_email_to = mock_send_email.call_args_list[0][0][0] |
| self.assertIn(email1, send_email_to) |
| self.assertNotIn(email2, send_email_to) |
| |
| @mock.patch('airflow.jobs.scheduler_job.Stats.incr') |
| @mock.patch("airflow.utils.email.send_email") |
| def test_dag_file_processor_sla_miss_email_exception(self, mock_send_email, mock_stats_incr): |
| """ |
| Test that the dag file processor gracefully logs an exception if there is a problem |
| sending an email |
| """ |
| session = settings.Session() |
| |
| # Mock the callback function so we can verify that it was not called |
| mock_send_email.side_effect = RuntimeError('Could not send an email') |
| |
| test_start_date = days_ago(2) |
| dag = DAG(dag_id='test_sla_miss', |
| default_args={'start_date': test_start_date, |
| 'sla': datetime.timedelta(days=1)}) |
| |
| task = DummyOperator(task_id='dummy', |
| dag=dag, |
| owner='airflow', |
| email='test@test.com', |
| sla=datetime.timedelta(hours=1)) |
| |
| session.merge(TaskInstance(task=task, execution_date=test_start_date, state='Success')) |
| |
| # Create an SlaMiss where notification was sent, but email was not |
| session.merge(SlaMiss(task_id='dummy', dag_id='test_sla_miss', execution_date=test_start_date)) |
| |
| mock_log = mock.MagicMock() |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock_log) |
| |
| dag_file_processor.manage_slas(dag=dag, session=session) |
| mock_log.exception.assert_called_once_with( |
| 'Could not send SLA Miss email notification for DAG %s', |
| 'test_sla_miss') |
| mock_stats_incr.assert_called_once_with('sla_email_notification_failure') |
| |
| def test_dag_file_processor_sla_miss_deleted_task(self): |
| """ |
| Test that the dag file processor will not crash when trying to send |
| sla miss notification for a deleted task |
| """ |
| session = settings.Session() |
| |
| test_start_date = days_ago(2) |
| dag = DAG(dag_id='test_sla_miss', |
| default_args={'start_date': test_start_date, |
| 'sla': datetime.timedelta(days=1)}) |
| |
| task = DummyOperator(task_id='dummy', |
| dag=dag, |
| owner='airflow', |
| email='test@test.com', |
| sla=datetime.timedelta(hours=1)) |
| |
| session.merge(TaskInstance(task=task, execution_date=test_start_date, state='Success')) |
| |
| # Create an SlaMiss where notification was sent, but email was not |
| session.merge(SlaMiss(task_id='dummy_deleted', dag_id='test_sla_miss', |
| execution_date=test_start_date)) |
| |
| mock_log = mock.MagicMock() |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock_log) |
| dag_file_processor.manage_slas(dag=dag, session=session) |
| |
| def test_dag_file_processor_dagrun_once(self): |
| """ |
| Test if the dag file proccessor does not create multiple dagruns |
| if a dag is scheduled with @once and a start_date |
| """ |
| dag = DAG( |
| 'test_scheduler_dagrun_once', |
| start_date=timezone.datetime(2015, 1, 1), |
| schedule_interval="@once") |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dag.clear() |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNone(dr) |
| |
| @freeze_time(timezone.datetime(2020, 1, 5)) |
| def test_dag_file_processor_dagrun_with_timedelta_schedule_and_catchup_false(self): |
| """ |
| Test that the dag file processor does not create multiple dagruns |
| if a dag is scheduled with 'timedelta' and catchup=False |
| """ |
| dag = DAG( |
| 'test_scheduler_dagrun_once_with_timedelta_and_catchup_false', |
| start_date=timezone.datetime(2015, 1, 1), |
| schedule_interval=timedelta(days=1), |
| catchup=False) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| self.assertEqual(dr.execution_date, timezone.datetime(2020, 1, 4)) |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNone(dr) |
| |
| @freeze_time(timezone.datetime(2020, 5, 4)) |
| def test_dag_file_processor_dagrun_with_timedelta_schedule_and_catchup_true(self): |
| """ |
| Test that the dag file processor creates multiple dagruns |
| if a dag is scheduled with 'timedelta' and catchup=True |
| """ |
| dag = DAG( |
| 'test_scheduler_dagrun_once_with_timedelta_and_catchup_true', |
| start_date=timezone.datetime(2020, 5, 1), |
| schedule_interval=timedelta(days=1), |
| catchup=True) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| self.assertEqual(dr.execution_date, timezone.datetime(2020, 5, 1)) |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| self.assertEqual(dr.execution_date, timezone.datetime(2020, 5, 2)) |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| self.assertEqual(dr.execution_date, timezone.datetime(2020, 5, 3)) |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNone(dr) |
| |
| @parameterized.expand([ |
| [State.NONE, None, None], |
| [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), |
| timezone.utcnow() - datetime.timedelta(minutes=15)], |
| [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), |
| timezone.utcnow() - datetime.timedelta(minutes=15)], |
| ]) |
| def test_dag_file_processor_process_task_instances(self, state, start_date, end_date): |
| """ |
| Test if _process_task_instances puts the right task instances into the |
| mock_list. |
| """ |
| dag = DAG( |
| dag_id='test_scheduler_process_execute_task', |
| start_date=DEFAULT_DATE) |
| dag_task1 = DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| |
| with create_session() as session: |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| |
| with create_session() as session: |
| tis = dr.get_task_instances(session=session) |
| for ti in tis: |
| ti.state = state |
| ti.start_date = start_date |
| ti.end_date = end_date |
| |
| mock_list = dag_file_processor._process_task_instances(dag, dag_runs=[dr]) |
| |
| self.assertEqual( |
| [(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER)], |
| mock_list |
| ) |
| |
| @parameterized.expand([ |
| [State.NONE, None, None], |
| [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), |
| timezone.utcnow() - datetime.timedelta(minutes=15)], |
| [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), |
| timezone.utcnow() - datetime.timedelta(minutes=15)], |
| ]) |
| def test_dag_file_processor_process_task_instances_with_task_concurrency( |
| self, state, start_date, end_date, |
| ): |
| """ |
| Test if _process_task_instances puts the right task instances into the |
| mock_list. |
| """ |
| dag = DAG( |
| dag_id='test_scheduler_process_execute_task_with_task_concurrency', |
| start_date=DEFAULT_DATE) |
| dag_task1 = DummyOperator( |
| task_id='dummy', |
| task_concurrency=2, |
| dag=dag, |
| owner='airflow') |
| |
| with create_session() as session: |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| |
| with create_session() as session: |
| tis = dr.get_task_instances(session=session) |
| for ti in tis: |
| ti.state = state |
| ti.start_date = start_date |
| ti.end_date = end_date |
| |
| ti_to_schedule = dag_file_processor._process_task_instances(dag, dag_runs=[dr]) |
| |
| assert ti_to_schedule == [ |
| (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), |
| ] |
| |
| @parameterized.expand([ |
| [State.NONE, None, None], |
| [State.UP_FOR_RETRY, timezone.utcnow() - datetime.timedelta(minutes=30), |
| timezone.utcnow() - datetime.timedelta(minutes=15)], |
| [State.UP_FOR_RESCHEDULE, timezone.utcnow() - datetime.timedelta(minutes=30), |
| timezone.utcnow() - datetime.timedelta(minutes=15)], |
| ]) |
| def test_dag_file_processor_process_task_instances_depends_on_past(self, state, start_date, end_date): |
| """ |
| Test if _process_task_instances puts the right task instances into the |
| mock_list. |
| """ |
| dag = DAG( |
| dag_id='test_scheduler_process_execute_task_depends_on_past', |
| start_date=DEFAULT_DATE, |
| default_args={ |
| 'depends_on_past': True, |
| }, |
| ) |
| dag_task1 = DummyOperator( |
| task_id='dummy1', |
| dag=dag, |
| owner='airflow') |
| dag_task2 = DummyOperator( |
| task_id='dummy2', |
| dag=dag, |
| owner='airflow') |
| |
| with create_session() as session: |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| |
| with create_session() as session: |
| tis = dr.get_task_instances(session=session) |
| for ti in tis: |
| ti.state = state |
| ti.start_date = start_date |
| ti.end_date = end_date |
| |
| ti_to_schedule = dag_file_processor._process_task_instances(dag, dag_runs=[dr]) |
| |
| assert sorted(ti_to_schedule) == [ |
| (dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER), |
| (dag.dag_id, dag_task2.task_id, DEFAULT_DATE, TRY_NUMBER), |
| ] |
| |
| def test_dag_file_processor_do_not_schedule_removed_task(self): |
| dag = DAG( |
| dag_id='test_scheduler_do_not_schedule_removed_task', |
| start_date=DEFAULT_DATE) |
| DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| |
| session = settings.Session() |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| session.commit() |
| session.close() |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| |
| dr = DagRun.find(run_id=dr.run_id)[0] |
| # Re-create the DAG, but remove the task |
| dag = DAG( |
| dag_id='test_scheduler_do_not_schedule_removed_task', |
| start_date=DEFAULT_DATE) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| mock_list = dag_file_processor._process_task_instances(dag, dag_runs=[dr]) |
| |
| self.assertEqual([], mock_list) |
| |
| def test_dag_file_processor_do_not_schedule_too_early(self): |
| dag = DAG( |
| dag_id='test_scheduler_do_not_schedule_too_early', |
| start_date=timezone.datetime(2200, 1, 1)) |
| DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| |
| session = settings.Session() |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| session.commit() |
| session.close() |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNone(dr) |
| |
| mock_list = dag_file_processor._process_task_instances(dag, dag_runs=[]) |
| self.assertEqual([], mock_list) |
| |
| def test_dag_file_processor_do_not_schedule_without_tasks(self): |
| dag = DAG( |
| dag_id='test_scheduler_do_not_schedule_without_tasks', |
| start_date=DEFAULT_DATE) |
| |
| with create_session() as session: |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear(session=session) |
| dag.start_date = None |
| dr = dag_file_processor.create_dag_run(dag, session=session) |
| self.assertIsNone(dr) |
| |
| def test_dag_file_processor_do_not_run_finished(self): |
| dag = DAG( |
| dag_id='test_scheduler_do_not_run_finished', |
| start_date=DEFAULT_DATE) |
| DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| |
| session = settings.Session() |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| session.commit() |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| |
| tis = dr.get_task_instances(session=session) |
| for ti in tis: |
| ti.state = State.SUCCESS |
| |
| session.commit() |
| session.close() |
| |
| mock_list = dag_file_processor._process_task_instances(dag, dag_runs=[dr]) |
| |
| self.assertEqual([], mock_list) |
| |
| def test_dag_file_processor_add_new_task(self): |
| """ |
| Test if a task instance will be added if the dag is updated |
| """ |
| dag = DAG( |
| dag_id='test_scheduler_add_new_task', |
| start_date=DEFAULT_DATE) |
| |
| DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| |
| session = settings.Session() |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| session.commit() |
| session.close() |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| |
| tis = dr.get_task_instances() |
| self.assertEqual(len(tis), 1) |
| |
| DummyOperator( |
| task_id='dummy2', |
| dag=dag, |
| owner='airflow') |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor._process_task_instances(dag, dag_runs=[dr]) |
| |
| tis = dr.get_task_instances() |
| self.assertEqual(len(tis), 2) |
| |
| def test_dag_file_processor_verify_max_active_runs(self): |
| """ |
| Test if a a dagrun will not be scheduled if max_dag_runs has been reached |
| """ |
| dag = DAG( |
| dag_id='test_scheduler_verify_max_active_runs', |
| start_date=DEFAULT_DATE) |
| dag.max_active_runs = 1 |
| |
| DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| |
| session = settings.Session() |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| session.commit() |
| session.close() |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNone(dr) |
| |
| def test_dag_file_processor_fail_dagrun_timeout(self): |
| """ |
| Test if a a dagrun wil be set failed if timeout |
| """ |
| dag = DAG( |
| dag_id='test_scheduler_fail_dagrun_timeout', |
| start_date=DEFAULT_DATE) |
| dag.dagrun_timeout = datetime.timedelta(seconds=60) |
| |
| DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| |
| session = settings.Session() |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| session.commit() |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| dr.start_date = timezone.utcnow() - datetime.timedelta(days=1) |
| session.merge(dr) |
| session.commit() |
| |
| dr2 = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr2) |
| |
| dr.refresh_from_db(session=session) |
| self.assertEqual(dr.state, State.FAILED) |
| |
| def test_dag_file_processor_verify_max_active_runs_and_dagrun_timeout(self): |
| """ |
| Test if a a dagrun will not be scheduled if max_dag_runs |
| has been reached and dagrun_timeout is not reached |
| |
| Test if a a dagrun will be scheduled if max_dag_runs has |
| been reached but dagrun_timeout is also reached |
| """ |
| dag = DAG( |
| dag_id='test_scheduler_verify_max_active_runs_and_dagrun_timeout', |
| start_date=DEFAULT_DATE) |
| dag.max_active_runs = 1 |
| dag.dagrun_timeout = datetime.timedelta(seconds=60) |
| |
| DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| |
| session = settings.Session() |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| session.commit() |
| session.close() |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| |
| # Should not be scheduled as DagRun has not timedout and max_active_runs is reached |
| new_dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNone(new_dr) |
| |
| # Should be scheduled as dagrun_timeout has passed |
| dr.start_date = timezone.utcnow() - datetime.timedelta(days=1) |
| session.merge(dr) |
| session.commit() |
| new_dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(new_dr) |
| |
| def test_dag_file_processor_max_active_runs_respected_after_clear(self): |
| """ |
| Test if _process_task_instances only schedules ti's up to max_active_runs |
| (related to issue AIRFLOW-137) |
| """ |
| dag = DAG( |
| dag_id='test_scheduler_max_active_runs_respected_after_clear', |
| start_date=DEFAULT_DATE) |
| dag.max_active_runs = 3 |
| |
| dag_task1 = DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| |
| session = settings.Session() |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| session.commit() |
| session.close() |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| |
| # First create up to 3 dagruns in RUNNING state. |
| dr1 = dag_file_processor.create_dag_run(dag) |
| assert dr1 is not None |
| dr2 = dag_file_processor.create_dag_run(dag) |
| assert dr2 is not None |
| dr3 = dag_file_processor.create_dag_run(dag) |
| assert dr3 is not None |
| assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 3 |
| |
| # Reduce max_active_runs to 1 |
| dag.max_active_runs = 1 |
| |
| # and schedule them in, so we can check how many |
| # tasks are put on the task_instances_list (should be one, not 3) |
| task_instances_list = dag_file_processor._process_task_instances(dag, dag_runs=[dr1, dr2, dr3]) |
| |
| self.assertEqual([(dag.dag_id, dag_task1.task_id, DEFAULT_DATE, TRY_NUMBER)], task_instances_list) |
| |
| def test_find_dags_to_run_includes_subdags(self): |
| dag = self.dagbag.get_dag('test_subdag_operator') |
| self.assertGreater(len(dag.subdags), 0) |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dags = dag_file_processor._find_dags_to_process(self.dagbag.dags.values()) |
| |
| self.assertIn(dag, dags) |
| for subdag in dag.subdags: |
| self.assertIn(subdag, dags) |
| |
| def test_dag_catchup_option(self): |
| """ |
| Test to check that a DAG with catchup = False only schedules beginning now, not back to the start date |
| """ |
| |
| def setup_dag(dag_id, schedule_interval, start_date, catchup): |
| default_args = { |
| 'owner': 'airflow', |
| 'depends_on_past': False, |
| 'start_date': start_date |
| } |
| dag = DAG(dag_id, |
| schedule_interval=schedule_interval, |
| max_active_runs=1, |
| catchup=catchup, |
| default_args=default_args) |
| |
| op1 = DummyOperator(task_id='t1', dag=dag) |
| op2 = DummyOperator(task_id='t2', dag=dag) |
| op2.set_upstream(op1) |
| op3 = DummyOperator(task_id='t3', dag=dag) |
| op3.set_upstream(op2) |
| |
| session = settings.Session() |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| session.commit() |
| session.close() |
| |
| return SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| now = timezone.utcnow() |
| six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace( |
| minute=0, second=0, microsecond=0) |
| half_an_hour_ago = now - datetime.timedelta(minutes=30) |
| two_hours_ago = now - datetime.timedelta(hours=2) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| |
| dag1 = setup_dag(dag_id='dag_with_catchup', |
| schedule_interval='* * * * *', |
| start_date=six_hours_ago_to_the_hour, |
| catchup=True) |
| default_catchup = conf.getboolean('scheduler', 'catchup_by_default') |
| self.assertEqual(default_catchup, True) |
| self.assertEqual(dag1.catchup, True) |
| |
| dag2 = setup_dag(dag_id='dag_without_catchup_ten_minute', |
| schedule_interval='*/10 * * * *', |
| start_date=six_hours_ago_to_the_hour, |
| catchup=False) |
| dr = dag_file_processor.create_dag_run(dag2) |
| # We had better get a dag run |
| self.assertIsNotNone(dr) |
| # The DR should be scheduled in the last half an hour, not 6 hours ago |
| self.assertGreater(dr.execution_date, half_an_hour_ago) |
| # The DR should be scheduled BEFORE now |
| self.assertLess(dr.execution_date, timezone.utcnow()) |
| |
| dag3 = setup_dag(dag_id='dag_without_catchup_hourly', |
| schedule_interval='@hourly', |
| start_date=six_hours_ago_to_the_hour, |
| catchup=False) |
| dr = dag_file_processor.create_dag_run(dag3) |
| # We had better get a dag run |
| self.assertIsNotNone(dr) |
| # The DR should be scheduled in the last 2 hours, not 6 hours ago |
| self.assertGreater(dr.execution_date, two_hours_ago) |
| # The DR should be scheduled BEFORE now |
| self.assertLess(dr.execution_date, timezone.utcnow()) |
| |
| dag4 = setup_dag(dag_id='dag_without_catchup_once', |
| schedule_interval='@once', |
| start_date=six_hours_ago_to_the_hour, |
| catchup=False) |
| dr = dag_file_processor.create_dag_run(dag4) |
| self.assertIsNotNone(dr) |
| |
| def test_dag_file_processor_auto_align(self): |
| """ |
| Test if the schedule_interval will be auto aligned with the start_date |
| such that if the start_date coincides with the schedule the first |
| execution_date will be start_date, otherwise it will be start_date + |
| interval. |
| """ |
| dag = DAG( |
| dag_id='test_scheduler_auto_align_1', |
| start_date=timezone.datetime(2016, 1, 1, 10, 10, 0), |
| schedule_interval="4 5 * * *" |
| ) |
| DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| |
| session = settings.Session() |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| session.commit() |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| self.assertEqual(dr.execution_date, timezone.datetime(2016, 1, 2, 5, 4)) |
| |
| dag = DAG( |
| dag_id='test_scheduler_auto_align_2', |
| start_date=timezone.datetime(2016, 1, 1, 10, 10, 0), |
| schedule_interval="10 10 * * *" |
| ) |
| DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow') |
| |
| session = settings.Session() |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| session.merge(orm_dag) |
| session.commit() |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag.clear() |
| |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| self.assertEqual(dr.execution_date, timezone.datetime(2016, 1, 1, 10, 10)) |
| |
| def test_process_dags_not_create_dagrun_for_subdags(self): |
| dag = self.dagbag.get_dag('test_subdag_operator') |
| |
| scheduler = DagFileProcessor(dag_ids=[dag.dag_id], log=mock.MagicMock()) |
| scheduler._process_task_instances = mock.MagicMock() |
| scheduler.manage_slas = mock.MagicMock() |
| |
| scheduler._process_dags([dag] + dag.subdags) |
| |
| with create_session() as session: |
| sub_dagruns = ( |
| session.query(DagRun).filter(DagRun.dag_id == dag.subdags[0].dag_id).count() |
| ) |
| |
| self.assertEqual(0, sub_dagruns) |
| |
| parent_dagruns = ( |
| session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).count() |
| ) |
| |
| self.assertGreater(parent_dagruns, 0) |
| |
| @patch.object(TaskInstance, 'handle_failure') |
| def test_execute_on_failure_callbacks(self, mock_ti_handle_failure): |
| dagbag = DagBag(dag_folder="/dev/null", include_examples=True) |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| with create_session() as session: |
| session.query(TaskInstance).delete() |
| dag = dagbag.get_dag('example_branch_operator') |
| task = dag.get_task(task_id='run_this_first') |
| |
| ti = TaskInstance(task, DEFAULT_DATE, State.RUNNING) |
| |
| session.add(ti) |
| session.commit() |
| |
| requests = [ |
| FailureCallbackRequest( |
| full_filepath="A", |
| simple_task_instance=SimpleTaskInstance(ti), |
| msg="Message" |
| ) |
| ] |
| dag_file_processor.execute_on_failure_callbacks(dagbag, requests) |
| mock_ti_handle_failure.assert_called_once_with( |
| "Message", |
| conf.getboolean('core', 'unit_test_mode'), |
| mock.ANY |
| ) |
| |
| def test_process_file_should_failure_callback(self): |
| dag_file = os.path.join( |
| os.path.dirname(os.path.realpath(__file__)), '../dags/test_on_failure_callback.py' |
| ) |
| dagbag = DagBag(dag_folder=dag_file, include_examples=False) |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| with create_session() as session, NamedTemporaryFile(delete=False) as callback_file: |
| session.query(TaskInstance).delete() |
| dag = dagbag.get_dag('test_om_failure_callback_dag') |
| task = dag.get_task(task_id='test_om_failure_callback_task') |
| |
| ti = TaskInstance(task, DEFAULT_DATE, State.RUNNING) |
| |
| session.add(ti) |
| session.commit() |
| |
| requests = [ |
| FailureCallbackRequest( |
| full_filepath=dag.full_filepath, |
| simple_task_instance=SimpleTaskInstance(ti), |
| msg="Message" |
| ) |
| ] |
| callback_file.close() |
| |
| with mock.patch.dict("os.environ", {"AIRFLOW_CALLBACK_FILE": callback_file.name}): |
| dag_file_processor.process_file(dag_file, requests) |
| with open(callback_file.name) as callback_file2: |
| content = callback_file2.read() |
| self.assertEqual("Callback fired", content) |
| os.remove(callback_file.name) |
| |
| def test_should_parse_only_unpaused_dags(self): |
| dag_file = os.path.join( |
| os.path.dirname(os.path.realpath(__file__)), '../dags/test_multiple_dags.py' |
| ) |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dagbag = DagBag(dag_folder=dag_file, include_examples=False) |
| dagbag.sync_to_db() |
| with create_session() as session: |
| session.query(TaskInstance).delete() |
| ( |
| session.query(DagModel) |
| .filter(DagModel.dag_id == "test_multiple_dags__dag_1") |
| .update({DagModel.is_paused: True}, synchronize_session=False) |
| ) |
| |
| serialized_dags, import_errors_count = dag_file_processor.process_file( |
| file_path=dag_file, failure_callback_requests=[] |
| ) |
| |
| dags = [SerializedDAG.from_dict(serialized_dag) for serialized_dag in serialized_dags] |
| |
| with create_session() as session: |
| tis = session.query(TaskInstance).all() |
| |
| self.assertEqual(0, import_errors_count) |
| self.assertEqual(['test_multiple_dags__dag_2'], [dag.dag_id for dag in dags]) |
| self.assertEqual({'test_multiple_dags__dag_2'}, {ti.dag_id for ti in tis}) |
| |
| def test_should_mark_dummy_task_as_success(self): |
| dag_file = os.path.join( |
| os.path.dirname(os.path.realpath(__file__)), '../dags/test_only_dummy_tasks.py' |
| ) |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| with create_session() as session: |
| session.query(TaskInstance).delete() |
| session.query(DagModel).delete() |
| |
| dagbag = DagBag(dag_folder=dag_file, include_examples=False) |
| dagbag.sync_to_db() |
| |
| serialized_dags, import_errors_count = dag_file_processor.process_file( |
| file_path=dag_file, failure_callback_requests=[] |
| ) |
| |
| dags = [SerializedDAG.from_dict(serialized_dag) for serialized_dag in serialized_dags] |
| |
| with create_session() as session: |
| tis = session.query(TaskInstance).all() |
| |
| self.assertEqual(0, import_errors_count) |
| self.assertEqual(['test_only_dummy_tasks'], [dag.dag_id for dag in dags]) |
| self.assertEqual(5, len(tis)) |
| self.assertEqual({ |
| ('test_task_a', 'success'), |
| ('test_task_b', None), |
| ('test_task_c', 'success'), |
| ('test_task_on_execute', 'scheduled'), |
| ('test_task_on_success', 'scheduled'), |
| }, {(ti.task_id, ti.state) for ti in tis}) |
| for state, start_date, end_date, duration in [(ti.state, ti.start_date, ti.end_date, ti.duration) for |
| ti in tis]: |
| if state == 'success': |
| self.assertIsNotNone(start_date) |
| self.assertIsNotNone(end_date) |
| self.assertEqual(0.0, duration) |
| else: |
| self.assertIsNone(start_date) |
| self.assertIsNone(end_date) |
| self.assertIsNone(duration) |
| |
| dag_file_processor.process_file( |
| file_path=dag_file, failure_callback_requests=[] |
| ) |
| with create_session() as session: |
| tis = session.query(TaskInstance).all() |
| |
| self.assertEqual(5, len(tis)) |
| self.assertEqual({ |
| ('test_task_a', 'success'), |
| ('test_task_b', 'success'), |
| ('test_task_c', 'success'), |
| ('test_task_on_execute', 'scheduled'), |
| ('test_task_on_success', 'scheduled'), |
| }, {(ti.task_id, ti.state) for ti in tis}) |
| for state, start_date, end_date, duration in [(ti.state, ti.start_date, ti.end_date, ti.duration) for |
| ti in tis]: |
| if state == 'success': |
| self.assertIsNotNone(start_date) |
| self.assertIsNotNone(end_date) |
| self.assertEqual(0.0, duration) |
| else: |
| self.assertIsNone(start_date) |
| self.assertIsNone(end_date) |
| self.assertIsNone(duration) |
| |
| |
| @pytest.mark.heisentests |
| class TestDagFileProcessorQueriesCount(unittest.TestCase): |
| """ |
| These tests are designed to detect changes in the number of queries for different DAG files. |
| |
| Each test has saved queries count in the table/spreadsheets. If you make a change that affected the number |
| of queries, please update the tables. |
| |
| These tests allow easy detection when a change is made that affects the performance of the |
| DagFileProcessor. |
| """ |
| |
| def setUp(self) -> None: |
| clear_db_runs() |
| clear_db_pools() |
| clear_db_dags() |
| clear_db_sla_miss() |
| clear_db_errors() |
| |
| @parameterized.expand( |
| [ |
| # pylint: disable=bad-whitespace |
| # expected, dag_count, task_count, start_ago, schedule_interval, shape |
| # One DAG with one task per DAG file |
| ([ 1, 1, 1, 1], 1, 1, "1d", "None", "no_structure"), # noqa |
| ([ 1, 1, 1, 1], 1, 1, "1d", "None", "linear"), # noqa |
| ([ 9, 5, 5, 5], 1, 1, "1d", "@once", "no_structure"), # noqa |
| ([ 9, 5, 5, 5], 1, 1, "1d", "@once", "linear"), # noqa |
| ([ 9, 12, 15, 18], 1, 1, "1d", "30m", "no_structure"), # noqa |
| ([ 9, 12, 15, 18], 1, 1, "1d", "30m", "linear"), # noqa |
| ([ 9, 12, 15, 18], 1, 1, "1d", "30m", "binary_tree"), # noqa |
| ([ 9, 12, 15, 18], 1, 1, "1d", "30m", "star"), # noqa |
| ([ 9, 12, 15, 18], 1, 1, "1d", "30m", "grid"), # noqa |
| # One DAG with five tasks per DAG file |
| ([ 1, 1, 1, 1], 1, 5, "1d", "None", "no_structure"), # noqa |
| ([ 1, 1, 1, 1], 1, 5, "1d", "None", "linear"), # noqa |
| ([ 9, 5, 5, 5], 1, 5, "1d", "@once", "no_structure"), # noqa |
| ([10, 6, 6, 6], 1, 5, "1d", "@once", "linear"), # noqa |
| ([ 9, 12, 15, 18], 1, 5, "1d", "30m", "no_structure"), # noqa |
| ([10, 14, 18, 22], 1, 5, "1d", "30m", "linear"), # noqa |
| ([10, 14, 18, 22], 1, 5, "1d", "30m", "binary_tree"), # noqa |
| ([10, 14, 18, 22], 1, 5, "1d", "30m", "star"), # noqa |
| ([10, 14, 18, 22], 1, 5, "1d", "30m", "grid"), # noqa |
| # 10 DAGs with 10 tasks per DAG file |
| ([ 1, 1, 1, 1], 10, 10, "1d", "None", "no_structure"), # noqa |
| ([ 1, 1, 1, 1], 10, 10, "1d", "None", "linear"), # noqa |
| ([81, 41, 41, 41], 10, 10, "1d", "@once", "no_structure"), # noqa |
| ([91, 51, 51, 51], 10, 10, "1d", "@once", "linear"), # noqa |
| ([81, 111, 111, 111], 10, 10, "1d", "30m", "no_structure"), # noqa |
| ([91, 131, 131, 131], 10, 10, "1d", "30m", "linear"), # noqa |
| ([91, 131, 131, 131], 10, 10, "1d", "30m", "binary_tree"), # noqa |
| ([91, 131, 131, 131], 10, 10, "1d", "30m", "star"), # noqa |
| ([91, 131, 131, 131], 10, 10, "1d", "30m", "grid"), # noqa |
| # pylint: enable=bad-whitespace |
| ] |
| ) |
| def test_process_dags_queries_count( |
| self, expected_query_counts, dag_count, task_count, start_ago, schedule_interval, shape |
| ): |
| with mock.patch.dict("os.environ", { |
| "PERF_DAGS_COUNT": str(dag_count), |
| "PERF_TASKS_COUNT": str(task_count), |
| "PERF_START_AGO": start_ago, |
| "PERF_SCHEDULE_INTERVAL": schedule_interval, |
| "PERF_SHAPE": shape, |
| }), conf_vars({ |
| ('scheduler', 'use_job_schedule'): 'True', |
| }): |
| dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, |
| include_examples=False, |
| include_smart_sensor=False) |
| processor = DagFileProcessor([], mock.MagicMock()) |
| for expected_query_count in expected_query_counts: |
| with assert_queries_count(expected_query_count): |
| processor._process_dags(dagbag.dags.values()) |
| |
| @parameterized.expand( |
| [ |
| # pylint: disable=bad-whitespace |
| # expected, dag_count, task_count, start_ago, schedule_interval, shape |
| # One DAG with two tasks per DAG file |
| ([ 5, 5, 5, 5], 1, 1, "1d", "None", "no_structure"), # noqa |
| ([ 5, 5, 5, 5], 1, 1, "1d", "None", "linear"), # noqa |
| ([15, 9, 9, 9], 1, 1, "1d", "@once", "no_structure"), # noqa |
| ([15, 9, 9, 9], 1, 1, "1d", "@once", "linear"), # noqa |
| ([15, 18, 21, 24], 1, 1, "1d", "30m", "no_structure"), # noqa |
| ([15, 18, 21, 24], 1, 1, "1d", "30m", "linear"), # noqa |
| # One DAG with five tasks per DAG file |
| ([ 5, 5, 5, 5], 1, 5, "1d", "None", "no_structure"), # noqa |
| ([ 5, 5, 5, 5], 1, 5, "1d", "None", "linear"), # noqa |
| ([15, 9, 9, 9], 1, 5, "1d", "@once", "no_structure"), # noqa |
| ([16, 10, 10, 10], 1, 5, "1d", "@once", "linear"), # noqa |
| ([15, 18, 21, 24], 1, 5, "1d", "30m", "no_structure"), # noqa |
| ([16, 20, 24, 28], 1, 5, "1d", "30m", "linear"), # noqa |
| # 10 DAGs with 10 tasks per DAG file |
| ([ 5, 5, 5, 5], 10, 10, "1d", "None", "no_structure"), # noqa |
| ([ 5, 5, 5, 5], 10, 10, "1d", "None", "linear"), # noqa |
| ([87, 45, 45, 45], 10, 10, "1d", "@once", "no_structure"), # noqa |
| ([97, 55, 55, 55], 10, 10, "1d", "@once", "linear"), # noqa |
| ([87, 117, 117, 117], 10, 10, "1d", "30m", "no_structure"), # noqa |
| ([97, 137, 137, 137], 10, 10, "1d", "30m", "linear"), # noqa |
| # pylint: enable=bad-whitespace |
| ] |
| ) |
| def test_process_file_queries_count( |
| self, expected_query_counts, dag_count, task_count, start_ago, schedule_interval, shape |
| ): |
| with mock.patch.dict("os.environ", { |
| "PERF_DAGS_COUNT": str(dag_count), |
| "PERF_TASKS_COUNT": str(task_count), |
| "PERF_START_AGO": start_ago, |
| "PERF_SCHEDULE_INTERVAL": schedule_interval, |
| "PERF_SHAPE": shape, |
| }), conf_vars({ |
| ('scheduler', 'use_job_schedule'): 'True' |
| }): |
| processor = DagFileProcessor([], mock.MagicMock()) |
| for expected_query_count in expected_query_counts: |
| with assert_queries_count(expected_query_count): |
| processor.process_file(ELASTIC_DAG_FILE, []) |
| |
| |
| @pytest.mark.usefixtures("disable_load_example") |
| class TestSchedulerJob(unittest.TestCase): |
| |
| def setUp(self): |
| clear_db_runs() |
| clear_db_pools() |
| clear_db_dags() |
| clear_db_sla_miss() |
| clear_db_errors() |
| |
| # Speed up some tests by not running the tasks, just look at what we |
| # enqueue! |
| self.null_exec = MockExecutor() |
| |
| @classmethod |
| @patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True) |
| def setUpClass(cls): |
| # Ensure the DAGs we are looking at from the DB are up-to-date |
| non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False) |
| non_serialized_dagbag.sync_to_db() |
| cls.dagbag = DagBag(read_dags_from_db=True) |
| |
| def test_is_alive(self): |
| job = SchedulerJob(None, heartrate=10, state=State.RUNNING) |
| self.assertTrue(job.is_alive()) |
| |
| job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20) |
| self.assertTrue(job.is_alive()) |
| |
| job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=31) |
| self.assertFalse(job.is_alive()) |
| |
| # test because .seconds was used before instead of total_seconds |
| # internal repr of datetime is (days, seconds) |
| job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(days=1) |
| self.assertFalse(job.is_alive()) |
| |
| job.state = State.SUCCESS |
| job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10) |
| self.assertFalse(job.is_alive(), "Completed jobs even with recent heartbeat should not be alive") |
| |
| def run_single_scheduler_loop_with_no_dags(self, dags_folder): |
| """ |
| Utility function that runs a single scheduler loop without actually |
| changing/scheduling any dags. This is useful to simulate the other side effects of |
| running a scheduler loop, e.g. to see what parse errors there are in the |
| dags_folder. |
| |
| :param dags_folder: the directory to traverse |
| :type dags_folder: str |
| """ |
| scheduler = SchedulerJob( |
| executor=self.null_exec, |
| dag_id='this_dag_doesnt_exist', # We don't want to actually run anything |
| num_runs=1, |
| subdir=os.path.join(dags_folder)) |
| scheduler.heartrate = 0 |
| scheduler.run() |
| |
| def _make_simple_dag_bag(self, dags): |
| return SimpleDagBag([SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) for dag in dags]) |
| |
| def test_no_orphan_process_will_be_left(self): |
| empty_dir = mkdtemp() |
| current_process = psutil.Process() |
| old_children = current_process.children(recursive=True) |
| scheduler = SchedulerJob(subdir=empty_dir, |
| num_runs=1, |
| executor=MockExecutor(do_update=False)) |
| scheduler.run() |
| shutil.rmtree(empty_dir) |
| |
| # Remove potential noise created by previous tests. |
| current_children = set(current_process.children(recursive=True)) - set( |
| old_children) |
| self.assertFalse(current_children) |
| |
| @mock.patch('airflow.jobs.scheduler_job.Stats.incr') |
| def test_process_executor_events(self, mock_stats_incr): |
| dag_id = "test_process_executor_events" |
| dag_id2 = "test_process_executor_events_2" |
| task_id_1 = 'dummy_task' |
| |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, full_filepath="/test_path1/") |
| dag2 = DAG(dag_id=dag_id2, start_date=DEFAULT_DATE, full_filepath="/test_path1/") |
| task1 = DummyOperator(dag=dag, task_id=task_id_1) |
| DummyOperator(dag=dag2, task_id=task_id_1) |
| dag.fileloc = "/test_path1/" |
| dag2.fileloc = "/test_path1/" |
| dagbag1 = self._make_simple_dag_bag([dag]) |
| dagbag2 = self._make_simple_dag_bag([dag2]) |
| |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| ti1 = TaskInstance(task1, DEFAULT_DATE) |
| ti1.state = State.QUEUED |
| session.merge(ti1) |
| session.commit() |
| |
| executor = MockExecutor(do_update=False) |
| executor.event_buffer[ti1.key] = State.FAILED, None |
| |
| scheduler.executor = executor |
| |
| scheduler.processor_agent = mock.MagicMock() |
| # dag bag does not contain dag_id |
| scheduler._process_executor_events(simple_dag_bag=dagbag2) |
| ti1.refresh_from_db() |
| self.assertEqual(ti1.state, State.QUEUED) |
| scheduler.processor_agent.send_callback_to_execute.assert_not_called() |
| |
| # dag bag does contain dag_id |
| scheduler._process_executor_events(simple_dag_bag=dagbag1) |
| ti1.refresh_from_db() |
| self.assertEqual(ti1.state, State.QUEUED) |
| scheduler.processor_agent.send_callback_to_execute.assert_called_once_with( |
| full_filepath='/test_path1/', |
| task_instance=mock.ANY, |
| msg='Executor reports task instance ' |
| '<TaskInstance: test_process_executor_events.dummy_task 2016-01-01 00:00:00+00:00 [queued]> ' |
| 'finished (failed) although the task says its queued. (Info: None) ' |
| 'Was the task killed externally?' |
| ) |
| scheduler.processor_agent.reset_mock() |
| |
| # ti in success state |
| ti1.state = State.SUCCESS |
| session.merge(ti1) |
| session.commit() |
| executor.event_buffer[ti1.key] = State.SUCCESS, None |
| |
| scheduler._process_executor_events(simple_dag_bag=dagbag1) |
| ti1.refresh_from_db() |
| self.assertEqual(ti1.state, State.SUCCESS) |
| scheduler.processor_agent.send_callback_to_execute.assert_not_called() |
| |
| mock_stats_incr.assert_called_once_with('scheduler.tasks.killed_externally') |
| |
| def test_process_executor_events_uses_inmemory_try_number(self): |
| execution_date = DEFAULT_DATE |
| dag_id = "dag_id" |
| task_id = "task_id" |
| try_number = 42 |
| |
| scheduler = SchedulerJob() |
| executor = MagicMock() |
| event_buffer = { |
| TaskInstanceKey(dag_id, task_id, execution_date, try_number): (State.SUCCESS, None) |
| } |
| executor.get_event_buffer.return_value = event_buffer |
| scheduler.executor = executor |
| |
| processor_agent = MagicMock() |
| scheduler.processor_agent = processor_agent |
| |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) |
| task = DummyOperator(dag=dag, task_id=task_id) |
| |
| with create_session() as session: |
| ti = TaskInstance(task, DEFAULT_DATE) |
| ti.state = State.SUCCESS |
| session.merge(ti) |
| |
| scheduler._process_executor_events(simple_dag_bag=MagicMock()) |
| # Assert that the even_buffer is empty so the task was popped using right |
| # task instance key |
| self.assertEqual(event_buffer, {}) |
| |
| def test_execute_task_instances_is_paused_wont_execute(self): |
| dag_id = 'SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute' |
| task_id_1 = 'dummy_task' |
| |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) |
| task1 = DummyOperator(dag=dag, task_id=task_id_1) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| dr1 = dag_file_processor.create_dag_run(dag) |
| ti1 = TaskInstance(task1, DEFAULT_DATE) |
| ti1.state = State.SCHEDULED |
| dr1.state = State.RUNNING |
| dagmodel = DagModel() |
| dagmodel.dag_id = dag_id |
| dagmodel.is_paused = True |
| session.merge(ti1) |
| session.merge(dr1) |
| session.add(dagmodel) |
| session.commit() |
| |
| scheduler._execute_task_instances(dagbag) |
| ti1.refresh_from_db() |
| self.assertEqual(State.SCHEDULED, ti1.state) |
| |
| def test_execute_task_instances_no_dagrun_task_will_execute(self): |
| """ |
| Tests that tasks without dagrun still get executed. |
| """ |
| dag_id = 'SchedulerJobTest.test_execute_task_instances_no_dagrun_task_will_execute' |
| task_id_1 = 'dummy_task' |
| |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) |
| task1 = DummyOperator(dag=dag, task_id=task_id_1) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| dag_file_processor.create_dag_run(dag) |
| ti1 = TaskInstance(task1, DEFAULT_DATE) |
| ti1.state = State.SCHEDULED |
| ti1.execution_date = ti1.execution_date + datetime.timedelta(days=1) |
| session.merge(ti1) |
| session.commit() |
| |
| scheduler._execute_task_instances(dagbag) |
| ti1.refresh_from_db() |
| self.assertEqual(State.QUEUED, ti1.state) |
| |
| def test_execute_task_instances_backfill_tasks_wont_execute(self): |
| """ |
| Tests that backfill tasks won't get executed. |
| """ |
| dag_id = 'SchedulerJobTest.test_execute_task_instances_backfill_tasks_wont_execute' |
| task_id_1 = 'dummy_task' |
| |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) |
| task1 = DummyOperator(dag=dag, task_id=task_id_1) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| dr1 = dag_file_processor.create_dag_run(dag) |
| dr1.run_type = DagRunType.BACKFILL_JOB.value |
| ti1 = TaskInstance(task1, dr1.execution_date) |
| ti1.refresh_from_db() |
| ti1.state = State.SCHEDULED |
| session.merge(ti1) |
| session.merge(dr1) |
| session.commit() |
| |
| self.assertTrue(dr1.is_backfill) |
| |
| scheduler._execute_task_instances(dagbag) |
| ti1.refresh_from_db() |
| self.assertEqual(State.SCHEDULED, ti1.state) |
| |
| def test_find_executable_task_instances_backfill_nodagrun(self): |
| dag_id = 'SchedulerJobTest.test_find_executable_task_instances_backfill_nodagrun' |
| task_id_1 = 'dummy' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) |
| task1 = DummyOperator(dag=dag, task_id=task_id_1) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| dr1 = dag_file_processor.create_dag_run(dag) |
| dr2 = dag_file_processor.create_dag_run(dag) |
| dr2.run_type = DagRunType.BACKFILL_JOB.value |
| |
| ti_no_dagrun = TaskInstance(task1, DEFAULT_DATE - datetime.timedelta(days=1)) |
| ti_backfill = TaskInstance(task1, dr2.execution_date) |
| ti_with_dagrun = TaskInstance(task1, dr1.execution_date) |
| # ti_with_paused |
| ti_no_dagrun.state = State.SCHEDULED |
| ti_backfill.state = State.SCHEDULED |
| ti_with_dagrun.state = State.SCHEDULED |
| |
| session.merge(dr2) |
| session.merge(ti_no_dagrun) |
| session.merge(ti_backfill) |
| session.merge(ti_with_dagrun) |
| session.commit() |
| |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| |
| self.assertEqual(2, len(res)) |
| res_keys = map(lambda x: x.key, res) |
| self.assertIn(ti_no_dagrun.key, res_keys) |
| self.assertIn(ti_with_dagrun.key, res_keys) |
| |
| def test_find_executable_task_instances_pool(self): |
| dag_id = 'SchedulerJobTest.test_find_executable_task_instances_pool' |
| task_id_1 = 'dummy' |
| task_id_2 = 'dummydummy' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) |
| task1 = DummyOperator(dag=dag, task_id=task_id_1, pool='a') |
| task2 = DummyOperator(dag=dag, task_id=task_id_2, pool='b') |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| dr1 = dag_file_processor.create_dag_run(dag) |
| dr2 = dag_file_processor.create_dag_run(dag) |
| |
| tis = ([ |
| TaskInstance(task1, dr1.execution_date), |
| TaskInstance(task2, dr1.execution_date), |
| TaskInstance(task1, dr2.execution_date), |
| TaskInstance(task2, dr2.execution_date) |
| ]) |
| for ti in tis: |
| ti.state = State.SCHEDULED |
| session.merge(ti) |
| pool = Pool(pool='a', slots=1, description='haha') |
| pool2 = Pool(pool='b', slots=100, description='haha') |
| session.add(pool) |
| session.add(pool2) |
| session.commit() |
| |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| session.commit() |
| self.assertEqual(3, len(res)) |
| res_keys = [] |
| for ti in res: |
| res_keys.append(ti.key) |
| self.assertIn(tis[0].key, res_keys) |
| self.assertIn(tis[1].key, res_keys) |
| self.assertIn(tis[3].key, res_keys) |
| |
| def test_find_executable_task_instances_in_default_pool(self): |
| set_default_pool_slots(1) |
| |
| dag_id = 'SchedulerJobTest.test_find_executable_task_instances_in_default_pool' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) |
| op1 = DummyOperator(dag=dag, task_id='dummy1') |
| op2 = DummyOperator(dag=dag, task_id='dummy2') |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| executor = MockExecutor(do_update=True) |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob(executor=executor) |
| dr1 = dag_file_processor.create_dag_run(dag) |
| dr2 = dag_file_processor.create_dag_run(dag) |
| |
| ti1 = TaskInstance(task=op1, execution_date=dr1.execution_date) |
| ti2 = TaskInstance(task=op2, execution_date=dr2.execution_date) |
| ti1.state = State.SCHEDULED |
| ti2.state = State.SCHEDULED |
| |
| session = settings.Session() |
| session.merge(ti1) |
| session.merge(ti2) |
| session.commit() |
| |
| # Two tasks w/o pool up for execution and our default pool size is 1 |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| self.assertEqual(1, len(res)) |
| |
| ti2.state = State.RUNNING |
| session.merge(ti2) |
| session.commit() |
| |
| # One task w/o pool up for execution and one task task running |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| self.assertEqual(0, len(res)) |
| |
| session.close() |
| |
| def test_nonexistent_pool(self): |
| dag_id = 'SchedulerJobTest.test_nonexistent_pool' |
| task_id = 'dummy_wrong_pool' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) |
| task = DummyOperator(dag=dag, task_id=task_id, pool="this_pool_doesnt_exist") |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| dr = dag_file_processor.create_dag_run(dag) |
| |
| ti = TaskInstance(task, dr.execution_date) |
| ti.state = State.SCHEDULED |
| session.merge(ti) |
| session.commit() |
| |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| session.commit() |
| self.assertEqual(0, len(res)) |
| |
| def test_find_executable_task_instances_none(self): |
| dag_id = 'SchedulerJobTest.test_find_executable_task_instances_none' |
| task_id_1 = 'dummy' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) |
| DummyOperator(dag=dag, task_id=task_id_1) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| dag_file_processor.create_dag_run(dag) |
| session.commit() |
| |
| self.assertEqual(0, len(scheduler._find_executable_task_instances( |
| dagbag, |
| session=session))) |
| |
| def test_find_executable_task_instances_concurrency(self): |
| dag_id = 'SchedulerJobTest.test_find_executable_task_instances_concurrency' |
| task_id_1 = 'dummy' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2) |
| task1 = DummyOperator(dag=dag, task_id=task_id_1) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| dr1 = dag_file_processor.create_dag_run(dag) |
| dr2 = dag_file_processor.create_dag_run(dag) |
| dr3 = dag_file_processor.create_dag_run(dag) |
| |
| ti1 = TaskInstance(task1, dr1.execution_date) |
| ti2 = TaskInstance(task1, dr2.execution_date) |
| ti3 = TaskInstance(task1, dr3.execution_date) |
| ti1.state = State.RUNNING |
| ti2.state = State.SCHEDULED |
| ti3.state = State.SCHEDULED |
| session.merge(ti1) |
| session.merge(ti2) |
| session.merge(ti3) |
| |
| session.commit() |
| |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| |
| self.assertEqual(1, len(res)) |
| res_keys = map(lambda x: x.key, res) |
| self.assertIn(ti2.key, res_keys) |
| |
| ti2.state = State.RUNNING |
| session.merge(ti2) |
| session.commit() |
| |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| |
| self.assertEqual(0, len(res)) |
| |
| def test_find_executable_task_instances_concurrency_queued(self): |
| dag_id = 'SchedulerJobTest.test_find_executable_task_instances_concurrency_queued' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3) |
| task1 = DummyOperator(dag=dag, task_id='dummy1') |
| task2 = DummyOperator(dag=dag, task_id='dummy2') |
| task3 = DummyOperator(dag=dag, task_id='dummy3') |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| dag_run = dag_file_processor.create_dag_run(dag) |
| |
| ti1 = TaskInstance(task1, dag_run.execution_date) |
| ti2 = TaskInstance(task2, dag_run.execution_date) |
| ti3 = TaskInstance(task3, dag_run.execution_date) |
| ti1.state = State.RUNNING |
| ti2.state = State.QUEUED |
| ti3.state = State.SCHEDULED |
| |
| session.merge(ti1) |
| session.merge(ti2) |
| session.merge(ti3) |
| |
| session.commit() |
| |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| |
| self.assertEqual(1, len(res)) |
| self.assertEqual(res[0].key, ti3.key) |
| |
| def test_find_executable_task_instances_task_concurrency(self): # pylint: disable=too-many-statements |
| dag_id = 'SchedulerJobTest.test_find_executable_task_instances_task_concurrency' |
| task_id_1 = 'dummy' |
| task_id_2 = 'dummy2' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) |
| task1 = DummyOperator(dag=dag, task_id=task_id_1, task_concurrency=2) |
| task2 = DummyOperator(dag=dag, task_id=task_id_2) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| executor = MockExecutor(do_update=True) |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob(executor=executor) |
| session = settings.Session() |
| |
| dr1 = dag_file_processor.create_dag_run(dag) |
| dr2 = dag_file_processor.create_dag_run(dag) |
| dr3 = dag_file_processor.create_dag_run(dag) |
| |
| ti1_1 = TaskInstance(task1, dr1.execution_date) |
| ti2 = TaskInstance(task2, dr1.execution_date) |
| |
| ti1_1.state = State.SCHEDULED |
| ti2.state = State.SCHEDULED |
| session.merge(ti1_1) |
| session.merge(ti2) |
| session.commit() |
| |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| |
| self.assertEqual(2, len(res)) |
| |
| ti1_1.state = State.RUNNING |
| ti2.state = State.RUNNING |
| ti1_2 = TaskInstance(task1, dr2.execution_date) |
| ti1_2.state = State.SCHEDULED |
| session.merge(ti1_1) |
| session.merge(ti2) |
| session.merge(ti1_2) |
| session.commit() |
| |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| |
| self.assertEqual(1, len(res)) |
| |
| ti1_2.state = State.RUNNING |
| ti1_3 = TaskInstance(task1, dr3.execution_date) |
| ti1_3.state = State.SCHEDULED |
| session.merge(ti1_2) |
| session.merge(ti1_3) |
| session.commit() |
| |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| |
| self.assertEqual(0, len(res)) |
| |
| ti1_1.state = State.SCHEDULED |
| ti1_2.state = State.SCHEDULED |
| ti1_3.state = State.SCHEDULED |
| session.merge(ti1_1) |
| session.merge(ti1_2) |
| session.merge(ti1_3) |
| session.commit() |
| |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| |
| self.assertEqual(2, len(res)) |
| |
| ti1_1.state = State.RUNNING |
| ti1_2.state = State.SCHEDULED |
| ti1_3.state = State.SCHEDULED |
| session.merge(ti1_1) |
| session.merge(ti1_2) |
| session.merge(ti1_3) |
| session.commit() |
| |
| res = scheduler._find_executable_task_instances( |
| dagbag, |
| session=session) |
| |
| self.assertEqual(1, len(res)) |
| |
| def test_change_state_for_executable_task_instances_no_tis(self): |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| res = scheduler._change_state_for_executable_task_instances( |
| [], session) |
| self.assertEqual(0, len(res)) |
| |
| def test_change_state_for_executable_task_instances_no_tis_with_state(self): |
| dag_id = 'SchedulerJobTest.test_change_state_for__no_tis_with_state' |
| task_id_1 = 'dummy' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2) |
| task1 = DummyOperator(dag=dag, task_id=task_id_1) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| self._make_simple_dag_bag([dag]) |
| |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dr1 = dag_file_processor.create_dag_run(dag) |
| dr2 = dag_file_processor.create_dag_run(dag) |
| dr3 = dag_file_processor.create_dag_run(dag) |
| |
| ti1 = TaskInstance(task1, dr1.execution_date) |
| ti2 = TaskInstance(task1, dr2.execution_date) |
| ti3 = TaskInstance(task1, dr3.execution_date) |
| ti1.state = State.RUNNING |
| ti2.state = State.RUNNING |
| ti3.state = State.RUNNING |
| session.merge(ti1) |
| session.merge(ti2) |
| session.merge(ti3) |
| |
| session.commit() |
| |
| res = scheduler._change_state_for_executable_task_instances( |
| [ti1, ti2, ti3], |
| session) |
| self.assertEqual(0, len(res)) |
| |
| def test_enqueue_task_instances_with_queued_state(self): |
| dag_id = 'SchedulerJobTest.test_enqueue_task_instances_with_queued_state' |
| task_id_1 = 'dummy' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE) |
| task1 = DummyOperator(dag=dag, task_id=task_id_1) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| dr1 = dag_file_processor.create_dag_run(dag) |
| |
| ti1 = TaskInstance(task1, dr1.execution_date) |
| session.merge(ti1) |
| session.commit() |
| |
| with patch.object(BaseExecutor, 'queue_command') as mock_queue_command: |
| scheduler._enqueue_task_instances_with_queued_state(dagbag, [ti1]) |
| |
| assert mock_queue_command.called |
| |
| def test_execute_task_instances_nothing(self): |
| dag_id = 'SchedulerJobTest.test_execute_task_instances_nothing' |
| task_id_1 = 'dummy' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=2) |
| task1 = DummyOperator(dag=dag, task_id=task_id_1) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = SimpleDagBag([]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| dr1 = dag_file_processor.create_dag_run(dag) |
| ti1 = TaskInstance(task1, dr1.execution_date) |
| ti1.state = State.SCHEDULED |
| session.merge(ti1) |
| session.commit() |
| |
| self.assertEqual(0, scheduler._execute_task_instances(dagbag)) |
| |
| def test_execute_task_instances(self): |
| dag_id = 'SchedulerJobTest.test_execute_task_instances' |
| task_id_1 = 'dummy_task' |
| task_id_2 = 'dummy_task_nonexistent_queue' |
| # important that len(tasks) is less than concurrency |
| # because before scheduler._execute_task_instances would only |
| # check the num tasks once so if concurrency was 3, |
| # we could execute arbitrarily many tasks in the second run |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=3) |
| task1 = DummyOperator(dag=dag, task_id=task_id_1) |
| task2 = DummyOperator(dag=dag, task_id=task_id_2) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| # create first dag run with 1 running and 1 queued |
| dr1 = dag_file_processor.create_dag_run(dag) |
| ti1 = TaskInstance(task1, dr1.execution_date) |
| ti2 = TaskInstance(task2, dr1.execution_date) |
| ti1.refresh_from_db() |
| ti2.refresh_from_db() |
| ti1.state = State.RUNNING |
| ti2.state = State.RUNNING |
| session.merge(ti1) |
| session.merge(ti2) |
| session.commit() |
| |
| self.assertEqual(State.RUNNING, dr1.state) |
| self.assertEqual( |
| 2, |
| DAG.get_num_task_instances( |
| dag_id, dag.task_ids, states=[State.RUNNING], session=session |
| ) |
| ) |
| |
| # create second dag run |
| dr2 = dag_file_processor.create_dag_run(dag) |
| ti3 = TaskInstance(task1, dr2.execution_date) |
| ti4 = TaskInstance(task2, dr2.execution_date) |
| ti3.refresh_from_db() |
| ti4.refresh_from_db() |
| # manually set to scheduled so we can pick them up |
| ti3.state = State.SCHEDULED |
| ti4.state = State.SCHEDULED |
| session.merge(ti3) |
| session.merge(ti4) |
| session.commit() |
| |
| self.assertEqual(State.RUNNING, dr2.state) |
| |
| res = scheduler._execute_task_instances(dagbag) |
| |
| # check that concurrency is respected |
| ti1.refresh_from_db() |
| ti2.refresh_from_db() |
| ti3.refresh_from_db() |
| ti4.refresh_from_db() |
| self.assertEqual( |
| 3, |
| DAG.get_num_task_instances( |
| dag_id, dag.task_ids, states=[State.RUNNING, State.QUEUED], session=session |
| ) |
| ) |
| self.assertEqual(State.RUNNING, ti1.state) |
| self.assertEqual(State.RUNNING, ti2.state) |
| six.assertCountEqual(self, [State.QUEUED, State.SCHEDULED], [ti3.state, ti4.state]) |
| self.assertEqual(1, res) |
| |
| def test_execute_task_instances_limit(self): |
| dag_id = 'SchedulerJobTest.test_execute_task_instances_limit' |
| task_id_1 = 'dummy_task' |
| task_id_2 = 'dummy_task_2' |
| # important that len(tasks) is less than concurrency |
| # because before scheduler._execute_task_instances would only |
| # check the num tasks once so if concurrency was 3, |
| # we could execute arbitrarily many tasks in the second run |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, concurrency=16) |
| task1 = DummyOperator(dag=dag, task_id=task_id_1) |
| task2 = DummyOperator(dag=dag, task_id=task_id_2) |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| scheduler.max_tis_per_query = 3 |
| session = settings.Session() |
| |
| tis = [] |
| for _ in range(0, 4): |
| dr = dag_file_processor.create_dag_run(dag) |
| ti1 = TaskInstance(task1, dr.execution_date) |
| ti2 = TaskInstance(task2, dr.execution_date) |
| tis.append(ti1) |
| tis.append(ti2) |
| ti1.refresh_from_db() |
| ti2.refresh_from_db() |
| ti1.state = State.SCHEDULED |
| ti2.state = State.SCHEDULED |
| session.merge(ti1) |
| session.merge(ti2) |
| session.commit() |
| res = scheduler._execute_task_instances(dagbag) |
| |
| self.assertEqual(8, res) |
| for ti in tis: |
| ti.refresh_from_db() |
| self.assertEqual(State.QUEUED, ti.state) |
| |
| @pytest.mark.quarantined |
| def test_change_state_for_tis_without_dagrun(self): |
| dag1 = DAG(dag_id='test_change_state_for_tis_without_dagrun', start_date=DEFAULT_DATE) |
| |
| DummyOperator(task_id='dummy', dag=dag1, owner='airflow') |
| |
| DummyOperator(task_id='dummy_b', dag=dag1, owner='airflow') |
| dag1 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag1)) |
| |
| dag2 = DAG(dag_id='test_change_state_for_tis_without_dagrun_dont_change', start_date=DEFAULT_DATE) |
| |
| DummyOperator(task_id='dummy', dag=dag2, owner='airflow') |
| |
| dag2 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag2)) |
| |
| dag3 = DAG(dag_id='test_change_state_for_tis_without_dagrun_no_dagrun', start_date=DEFAULT_DATE) |
| |
| DummyOperator(task_id='dummy', dag=dag3, owner='airflow') |
| |
| dag3 = SerializedDAG.from_dict(SerializedDAG.to_dict(dag3)) |
| |
| session = settings.Session() |
| dr1 = dag1.create_dagrun(run_type=DagRunType.SCHEDULED, |
| state=State.RUNNING, |
| execution_date=DEFAULT_DATE, |
| start_date=DEFAULT_DATE, |
| session=session) |
| |
| dr2 = dag2.create_dagrun(run_type=DagRunType.SCHEDULED, |
| state=State.RUNNING, |
| execution_date=DEFAULT_DATE, |
| start_date=DEFAULT_DATE, |
| session=session) |
| |
| ti1a = dr1.get_task_instance(task_id='dummy', session=session) |
| ti1a.state = State.SCHEDULED |
| ti1b = dr1.get_task_instance(task_id='dummy_b', session=session) |
| ti1b.state = State.SUCCESS |
| session.commit() |
| |
| ti2 = dr2.get_task_instance(task_id='dummy', session=session) |
| ti2.state = State.SCHEDULED |
| session.commit() |
| |
| ti3 = TaskInstance(dag3.get_task('dummy'), DEFAULT_DATE) |
| ti3.state = State.SCHEDULED |
| session.merge(ti3) |
| session.commit() |
| |
| dagbag = self._make_simple_dag_bag([dag1, dag2, dag3]) |
| scheduler = SchedulerJob(num_runs=0) |
| scheduler._change_state_for_tis_without_dagrun( |
| simple_dag_bag=dagbag, |
| old_states=[State.SCHEDULED, State.QUEUED], |
| new_state=State.NONE, |
| session=session) |
| |
| ti1a = dr1.get_task_instance(task_id='dummy', session=session) |
| ti1a.refresh_from_db(session=session) |
| self.assertEqual(ti1a.state, State.SCHEDULED) |
| |
| ti1b = dr1.get_task_instance(task_id='dummy_b', session=session) |
| ti1b.refresh_from_db(session=session) |
| self.assertEqual(ti1b.state, State.SUCCESS) |
| |
| ti2 = dr2.get_task_instance(task_id='dummy', session=session) |
| ti2.refresh_from_db(session=session) |
| self.assertEqual(ti2.state, State.SCHEDULED) |
| |
| ti3.refresh_from_db(session=session) |
| self.assertEqual(ti3.state, State.NONE) |
| |
| dr1.refresh_from_db(session=session) |
| dr1.state = State.FAILED |
| |
| # why o why |
| session.merge(dr1) |
| session.commit() |
| |
| scheduler._change_state_for_tis_without_dagrun( |
| simple_dag_bag=dagbag, |
| old_states=[State.SCHEDULED, State.QUEUED], |
| new_state=State.NONE, |
| session=session) |
| ti1a.refresh_from_db(session=session) |
| self.assertEqual(ti1a.state, State.SCHEDULED) |
| |
| # don't touch ti1b |
| ti1b.refresh_from_db(session=session) |
| self.assertEqual(ti1b.state, State.SUCCESS) |
| |
| # don't touch ti2 |
| ti2.refresh_from_db(session=session) |
| self.assertEqual(ti2.state, State.SCHEDULED) |
| |
| def test_change_state_for_tasks_failed_to_execute(self): |
| dag = DAG( |
| dag_id='dag_id', |
| start_date=DEFAULT_DATE) |
| |
| task = DummyOperator( |
| task_id='task_id', |
| dag=dag, |
| owner='airflow') |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| # If there's no left over task in executor.queued_tasks, nothing happens |
| session = settings.Session() |
| scheduler_job = SchedulerJob() |
| mock_logger = mock.MagicMock() |
| test_executor = MockExecutor(do_update=False) |
| scheduler_job.executor = test_executor |
| scheduler_job._logger = mock_logger |
| scheduler_job._change_state_for_tasks_failed_to_execute() # pylint: disable=no-value-for-parameter |
| mock_logger.info.assert_not_called() |
| |
| # Tasks failed to execute with QUEUED state will be set to SCHEDULED state. |
| session.query(TaskInstance).delete() |
| session.commit() |
| key = 'dag_id', 'task_id', DEFAULT_DATE, 1 |
| test_executor.queued_tasks[key] = 'value' |
| ti = TaskInstance(task, DEFAULT_DATE) |
| ti.state = State.QUEUED |
| session.merge(ti) # pylint: disable=no-value-for-parameter |
| session.commit() |
| |
| scheduler_job._change_state_for_tasks_failed_to_execute() # pylint: disable=no-value-for-parameter |
| |
| ti.refresh_from_db() |
| self.assertEqual(State.SCHEDULED, ti.state) |
| |
| # Tasks failed to execute with RUNNING state will not be set to SCHEDULED state. |
| session.query(TaskInstance).delete() |
| session.commit() |
| ti.state = State.RUNNING |
| |
| session.merge(ti) |
| session.commit() |
| |
| scheduler_job._change_state_for_tasks_failed_to_execute() # pylint: disable=no-value-for-parameter |
| |
| ti.refresh_from_db() |
| self.assertEqual(State.RUNNING, ti.state) |
| |
| def test_adopt_or_reset_orphaned_tasks(self): |
| session = settings.Session() |
| dag = DAG( |
| 'test_execute_helper_reset_orphaned_tasks', |
| start_date=DEFAULT_DATE, |
| default_args={'owner': 'owner1'}) |
| |
| with dag: |
| op1 = DummyOperator(task_id='op1') |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag.clear() |
| dr = dag.create_dagrun(run_type=DagRunType.SCHEDULED, |
| state=State.RUNNING, |
| execution_date=DEFAULT_DATE, |
| start_date=DEFAULT_DATE, |
| session=session) |
| dr2 = dag.create_dagrun(run_type=DagRunType.BACKFILL_JOB, |
| state=State.RUNNING, |
| execution_date=DEFAULT_DATE + datetime.timedelta(1), |
| start_date=DEFAULT_DATE, |
| session=session) |
| ti = dr.get_task_instance(task_id=op1.task_id, session=session) |
| ti.state = State.SCHEDULED |
| ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session) |
| ti2.state = State.SCHEDULED |
| session.commit() |
| |
| processor = mock.MagicMock() |
| |
| scheduler = SchedulerJob(num_runs=0) |
| scheduler.processor_agent = processor |
| |
| scheduler.adopt_or_reset_orphaned_tasks() |
| |
| ti = dr.get_task_instance(task_id=op1.task_id, session=session) |
| self.assertEqual(ti.state, State.NONE) |
| |
| ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session) |
| self.assertEqual(ti2.state, State.SCHEDULED, "Tasks run by Backfill Jobs should not be reset") |
| |
| @parameterized.expand([ |
| [State.UP_FOR_RETRY, State.FAILED], |
| [State.QUEUED, State.NONE], |
| [State.SCHEDULED, State.NONE], |
| [State.UP_FOR_RESCHEDULE, State.NONE], |
| ]) |
| def test_scheduler_loop_should_change_state_for_tis_without_dagrun(self, |
| initial_task_state, |
| expected_task_state): |
| session = settings.Session() |
| dag = DAG( |
| 'test_execute_helper_should_change_state_for_tis_without_dagrun', |
| start_date=DEFAULT_DATE, |
| default_args={'owner': 'owner1'}) |
| |
| with dag: |
| op1 = DummyOperator(task_id='op1') |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| # Create DAG run with FAILED state |
| dag.clear() |
| dr = dag.create_dagrun(run_type=DagRunType.SCHEDULED, |
| state=State.FAILED, |
| execution_date=DEFAULT_DATE, |
| start_date=DEFAULT_DATE, |
| session=session) |
| ti = dr.get_task_instance(task_id=op1.task_id, session=session) |
| ti.state = initial_task_state |
| session.commit() |
| |
| # Create scheduler and mock calls to processor. Run duration is set |
| # to a high value to ensure loop is entered. Poll interval is 0 to |
| # avoid sleep. Done flag is set to true to exist the loop immediately. |
| scheduler = SchedulerJob(num_runs=0, processor_poll_interval=0) |
| executor = MockExecutor(do_update=False) |
| executor.queued_tasks |
| scheduler.executor = executor |
| processor = mock.MagicMock() |
| processor.harvest_serialized_dags.return_value = [ |
| SerializedDAG.from_dict(SerializedDAG.to_dict(dag))] |
| processor.done = True |
| scheduler.processor_agent = processor |
| |
| scheduler._run_scheduler_loop() |
| |
| ti = dr.get_task_instance(task_id=op1.task_id, session=session) |
| self.assertEqual(ti.state, expected_task_state) |
| |
| @provide_session |
| def evaluate_dagrun( |
| self, |
| dag_id, |
| expected_task_states, # dict of task_id: state |
| dagrun_state, |
| run_kwargs=None, |
| advance_execution_date=False, |
| session=None): # pylint: disable=unused-argument |
| |
| """ |
| Helper for testing DagRun states with simple two-task DAGS. |
| This is hackish: a dag run is created but its tasks are |
| run by a backfill. |
| """ |
| if run_kwargs is None: |
| run_kwargs = {} |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag = self.dagbag.get_dag(dag_id) |
| dr = dag_file_processor.create_dag_run(dag) |
| |
| if advance_execution_date: |
| # run a second time to schedule a dagrun after the start_date |
| dr = dag_file_processor.create_dag_run(dag) |
| ex_date = dr.execution_date |
| |
| for tid, state in expected_task_states.items(): |
| if state != State.FAILED: |
| continue |
| self.null_exec.mock_task_fail(dag_id, tid, ex_date) |
| |
| try: |
| # This needs a _REAL_ dag, not the serialized version |
| dag.run(start_date=ex_date, end_date=ex_date, executor=self.null_exec, **run_kwargs) |
| except AirflowException: |
| pass |
| |
| # test tasks |
| for task_id, expected_state in expected_task_states.items(): |
| task = dag.get_task(task_id) |
| ti = TaskInstance(task, ex_date) |
| ti.refresh_from_db() |
| self.assertEqual(ti.state, expected_state) |
| |
| # load dagrun |
| dr = DagRun.find(dag_id=dag_id, execution_date=ex_date) |
| dr = dr[0] |
| dr.dag = dag |
| |
| self.assertEqual(dr.state, dagrun_state) |
| |
| def test_dagrun_fail(self): |
| """ |
| DagRuns with one failed and one incomplete root task -> FAILED |
| """ |
| self.evaluate_dagrun( |
| dag_id='test_dagrun_states_fail', |
| expected_task_states={ |
| 'test_dagrun_fail': State.FAILED, |
| 'test_dagrun_succeed': State.UPSTREAM_FAILED, |
| }, |
| dagrun_state=State.FAILED) |
| |
| def test_dagrun_success(self): |
| """ |
| DagRuns with one failed and one successful root task -> SUCCESS |
| """ |
| self.evaluate_dagrun( |
| dag_id='test_dagrun_states_success', |
| expected_task_states={ |
| 'test_dagrun_fail': State.FAILED, |
| 'test_dagrun_succeed': State.SUCCESS, |
| }, |
| dagrun_state=State.SUCCESS) |
| |
| def test_dagrun_root_fail(self): |
| """ |
| DagRuns with one successful and one failed root task -> FAILED |
| """ |
| self.evaluate_dagrun( |
| dag_id='test_dagrun_states_root_fail', |
| expected_task_states={ |
| 'test_dagrun_succeed': State.SUCCESS, |
| 'test_dagrun_fail': State.FAILED, |
| }, |
| dagrun_state=State.FAILED) |
| |
| def test_dagrun_root_fail_unfinished(self): |
| """ |
| DagRuns with one unfinished and one failed root task -> RUNNING |
| """ |
| # TODO: this should live in test_dagrun.py |
| # Run both the failed and successful tasks |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag_id = 'test_dagrun_states_root_fail_unfinished' |
| dag = self.dagbag.get_dag(dag_id) |
| dr = dag_file_processor.create_dag_run(dag) |
| self.null_exec.mock_task_fail(dag_id, 'test_dagrun_fail', DEFAULT_DATE) |
| |
| with self.assertRaises(AirflowException): |
| dag.run(start_date=dr.execution_date, end_date=dr.execution_date, executor=self.null_exec) |
| |
| # Mark the successful task as never having run since we want to see if the |
| # dagrun will be in a running state despite haveing an unfinished task. |
| with create_session() as session: |
| ti = dr.get_task_instance('test_dagrun_unfinished', session=session) |
| ti.state = State.NONE |
| session.commit() |
| dr.update_state() |
| self.assertEqual(dr.state, State.RUNNING) |
| |
| def test_dagrun_root_after_dagrun_unfinished(self): |
| """ |
| DagRuns with one successful and one future root task -> SUCCESS |
| |
| Noted: the DagRun state could be still in running state during CI. |
| """ |
| dag_id = 'test_dagrun_states_root_future' |
| dag = self.dagbag.get_dag(dag_id) |
| scheduler = SchedulerJob( |
| dag_id, |
| num_runs=1, |
| executor=self.null_exec, |
| subdir=dag.fileloc) |
| scheduler.run() |
| |
| first_run = DagRun.find(dag_id=dag_id, execution_date=DEFAULT_DATE)[0] |
| ti_ids = [(ti.task_id, ti.state) for ti in first_run.get_task_instances()] |
| |
| self.assertEqual(ti_ids, [('current', State.SUCCESS)]) |
| self.assertIn(first_run.state, [State.SUCCESS, State.RUNNING]) |
| |
| def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self): |
| """ |
| DagRun is marked a success if ignore_first_depends_on_past=True |
| |
| Test that an otherwise-deadlocked dagrun is marked as a success |
| if ignore_first_depends_on_past=True and the dagrun execution_date |
| is after the start_date. |
| """ |
| self.evaluate_dagrun( |
| dag_id='test_dagrun_states_deadlock', |
| expected_task_states={ |
| 'test_depends_on_past': State.SUCCESS, |
| 'test_depends_on_past_2': State.SUCCESS, |
| }, |
| dagrun_state=State.SUCCESS, |
| advance_execution_date=True, |
| run_kwargs=dict(ignore_first_depends_on_past=True)) |
| |
| def test_dagrun_deadlock_ignore_depends_on_past(self): |
| """ |
| Test that ignore_first_depends_on_past doesn't affect results |
| (this is the same test as |
| test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date except |
| that start_date == execution_date so depends_on_past is irrelevant). |
| """ |
| self.evaluate_dagrun( |
| dag_id='test_dagrun_states_deadlock', |
| expected_task_states={ |
| 'test_depends_on_past': State.SUCCESS, |
| 'test_depends_on_past_2': State.SUCCESS, |
| }, |
| dagrun_state=State.SUCCESS, |
| run_kwargs=dict(ignore_first_depends_on_past=True)) |
| |
| def test_scheduler_start_date(self): |
| """ |
| Test that the scheduler respects start_dates, even when DAGS have run |
| """ |
| with create_session() as session: |
| dag_id = 'test_start_date_scheduling' |
| dag = self.dagbag.get_dag(dag_id) |
| dag.clear() |
| self.assertGreater(dag.start_date, datetime.datetime.now(timezone.utc)) |
| |
| scheduler = SchedulerJob(dag_id, |
| executor=self.null_exec, |
| subdir=dag.fileloc, |
| num_runs=1) |
| scheduler.run() |
| |
| # zero tasks ran |
| self.assertEqual( |
| len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 0) |
| session.commit() |
| self.assertListEqual([], self.null_exec.sorted_tasks) |
| |
| # previously, running this backfill would kick off the Scheduler |
| # because it would take the most recent run and start from there |
| # That behavior still exists, but now it will only do so if after the |
| # start date |
| bf_exec = MockExecutor() |
| backfill = BackfillJob( |
| executor=bf_exec, |
| dag=dag, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE) |
| backfill.run() |
| |
| # one task ran |
| self.assertEqual( |
| len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 1) |
| self.assertListEqual( |
| [ |
| (TaskInstanceKey(dag.dag_id, 'dummy', DEFAULT_DATE, 1), (State.SUCCESS, None)), |
| ], |
| bf_exec.sorted_tasks |
| ) |
| session.commit() |
| |
| scheduler = SchedulerJob(dag_id, |
| executor=self.null_exec, |
| subdir=dag.fileloc, |
| num_runs=1) |
| scheduler.run() |
| |
| # still one task |
| self.assertEqual( |
| len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 1) |
| session.commit() |
| self.assertListEqual([], self.null_exec.sorted_tasks) |
| |
| def test_scheduler_task_start_date(self): |
| """ |
| Test that the scheduler respects task start dates that are different from DAG start dates |
| """ |
| |
| dag_id = 'test_task_start_date_scheduling' |
| dag = self.dagbag.get_dag(dag_id) |
| dag.clear() |
| scheduler = SchedulerJob(dag_id, |
| executor=self.null_exec, |
| subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'), |
| num_runs=2) |
| scheduler.run() |
| |
| session = settings.Session() |
| tiq = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id) |
| ti1s = tiq.filter(TaskInstance.task_id == 'dummy1').all() |
| ti2s = tiq.filter(TaskInstance.task_id == 'dummy2').all() |
| self.assertEqual(len(ti1s), 0) |
| self.assertEqual(len(ti2s), 2) |
| for task in ti2s: |
| self.assertEqual(task.state, State.SUCCESS) |
| |
| def test_scheduler_multiprocessing(self): |
| """ |
| Test that the scheduler can successfully queue multiple dags in parallel |
| """ |
| dag_ids = ['test_start_date_scheduling', 'test_dagrun_states_success'] |
| for dag_id in dag_ids: |
| dag = self.dagbag.get_dag(dag_id) |
| dag.clear() |
| |
| scheduler = SchedulerJob(dag_ids=dag_ids, |
| executor=self.null_exec, |
| subdir=os.path.join(TEST_DAG_FOLDER, 'test_scheduler_dags.py'), |
| num_runs=1) |
| scheduler.run() |
| |
| # zero tasks ran |
| dag_id = 'test_start_date_scheduling' |
| session = settings.Session() |
| self.assertEqual( |
| len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()), 0) |
| |
| @conf_vars({("core", "mp_start_method"): "spawn"}) |
| def test_scheduler_multiprocessing_with_spawn_method(self): |
| """ |
| Test that the scheduler can successfully queue multiple dags in parallel |
| when using "spawn" mode of multiprocessing. (Fork is default on Linux and older OSX) |
| """ |
| dag_ids = ['test_start_date_scheduling', 'test_dagrun_states_success'] |
| for dag_id in dag_ids: |
| dag = self.dagbag.get_dag(dag_id) |
| dag.clear() |
| |
| scheduler = SchedulerJob(dag_ids=dag_ids, |
| executor=self.null_exec, |
| subdir=os.path.join( |
| TEST_DAG_FOLDER, 'test_scheduler_dags.py'), |
| num_runs=1) |
| |
| scheduler.run() |
| |
| # zero tasks ran |
| dag_id = 'test_start_date_scheduling' |
| with create_session() as session: |
| self.assertEqual( |
| session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).count(), 0) |
| |
| def test_scheduler_verify_pool_full(self): |
| """ |
| Test task instances not queued when pool is full |
| """ |
| dag = DAG( |
| dag_id='test_scheduler_verify_pool_full', |
| start_date=DEFAULT_DATE) |
| |
| DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow', |
| pool='test_scheduler_verify_pool_full') |
| |
| session = settings.Session() |
| pool = Pool(pool='test_scheduler_verify_pool_full', slots=1) |
| session.add(pool) |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| orm_dag.is_paused = False |
| session.merge(orm_dag) |
| session.commit() |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob(executor=self.null_exec) |
| |
| # Create 2 dagruns, which will create 2 task instances. |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| self.assertEqual(dr.execution_date, DEFAULT_DATE) |
| dr = dag_file_processor.create_dag_run(dag) |
| self.assertIsNotNone(dr) |
| dag_runs = DagRun.find(dag_id="test_scheduler_verify_pool_full") |
| task_instances_list = dag_file_processor._process_task_instances(dag, dag_runs=dag_runs) |
| self.assertEqual(len(task_instances_list), 2) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| # Recreated part of the scheduler here, to kick off tasks -> executor |
| for ti_key in task_instances_list: |
| task = dag.get_task(ti_key[1]) |
| ti = TaskInstance(task, ti_key[2]) |
| # Task starts out in the scheduled state. All tasks in the |
| # scheduled state will be sent to the executor |
| ti.state = State.SCHEDULED |
| |
| # Also save this task instance to the DB. |
| session.merge(ti) |
| session.commit() |
| |
| self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition") |
| scheduler._execute_task_instances(dagbag, session=session) |
| |
| self.assertEqual(len(scheduler.executor.queued_tasks), 1) |
| |
| def test_scheduler_verify_pool_full_2_slots_per_task(self): |
| """ |
| Test task instances not queued when pool is full. |
| |
| Variation with non-default pool_slots |
| """ |
| dag = DAG( |
| dag_id='test_scheduler_verify_pool_full_2_slots_per_task', |
| start_date=DEFAULT_DATE) |
| |
| DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow', |
| pool='test_scheduler_verify_pool_full_2_slots_per_task', |
| pool_slots=2, |
| ) |
| |
| session = settings.Session() |
| pool = Pool(pool='test_scheduler_verify_pool_full_2_slots_per_task', slots=6) |
| session.add(pool) |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| orm_dag.is_paused = False |
| session.merge(orm_dag) |
| session.commit() |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob(executor=self.null_exec) |
| |
| # Create 5 dagruns, which will create 5 task instances. |
| for _ in range(5): |
| dag_file_processor.create_dag_run(dag) |
| dag_runs = DagRun.find(dag_id="test_scheduler_verify_pool_full_2_slots_per_task") |
| task_instances_list = dag_file_processor._process_task_instances(dag, dag_runs=dag_runs) |
| self.assertEqual(len(task_instances_list), 5) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| # Recreated part of the scheduler here, to kick off tasks -> executor |
| for ti_key in task_instances_list: |
| task = dag.get_task(ti_key[1]) |
| ti = TaskInstance(task, ti_key[2]) |
| # Task starts out in the scheduled state. All tasks in the |
| # scheduled state will be sent to the executor |
| ti.state = State.SCHEDULED |
| |
| # Also save this task instance to the DB. |
| session.merge(ti) |
| session.commit() |
| |
| self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition") |
| scheduler._execute_task_instances(dagbag, session=session) |
| |
| # As tasks require 2 slots, only 3 can fit into 6 available |
| self.assertEqual(len(scheduler.executor.queued_tasks), 3) |
| |
| def test_scheduler_verify_priority_and_slots(self): |
| """ |
| Test task instances with higher priority are not queued |
| when pool does not have enough slots. |
| |
| Though tasks with lower priority might be executed. |
| """ |
| dag = DAG( |
| dag_id='test_scheduler_verify_priority_and_slots', |
| start_date=DEFAULT_DATE) |
| |
| # Medium priority, not enough slots |
| DummyOperator( |
| task_id='test_scheduler_verify_priority_and_slots_t0', |
| dag=dag, |
| owner='airflow', |
| pool='test_scheduler_verify_priority_and_slots', |
| pool_slots=2, |
| priority_weight=2, |
| ) |
| # High priority, occupies first slot |
| DummyOperator( |
| task_id='test_scheduler_verify_priority_and_slots_t1', |
| dag=dag, |
| owner='airflow', |
| pool='test_scheduler_verify_priority_and_slots', |
| pool_slots=1, |
| priority_weight=3, |
| ) |
| # Low priority, occupies second slot |
| DummyOperator( |
| task_id='test_scheduler_verify_priority_and_slots_t2', |
| dag=dag, |
| owner='airflow', |
| pool='test_scheduler_verify_priority_and_slots', |
| pool_slots=1, |
| priority_weight=1, |
| ) |
| |
| session = settings.Session() |
| pool = Pool(pool='test_scheduler_verify_priority_and_slots', slots=2) |
| session.add(pool) |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| orm_dag.is_paused = False |
| session.merge(orm_dag) |
| session.commit() |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob(executor=self.null_exec) |
| |
| dag_file_processor.create_dag_run(dag) |
| dag_runs = DagRun.find(dag_id="test_scheduler_verify_priority_and_slots") |
| task_instances_list = dag_file_processor._process_task_instances(dag, dag_runs=dag_runs) |
| self.assertEqual(len(task_instances_list), 3) |
| dagbag = self._make_simple_dag_bag([dag]) |
| |
| # Recreated part of the scheduler here, to kick off tasks -> executor |
| for ti_key in task_instances_list: |
| task = dag.get_task(ti_key[1]) |
| ti = TaskInstance(task, ti_key[2]) |
| # Task starts out in the scheduled state. All tasks in the |
| # scheduled state will be sent to the executor |
| ti.state = State.SCHEDULED |
| |
| # Also save this task instance to the DB. |
| session.merge(ti) |
| session.commit() |
| |
| self.assertEqual(len(scheduler.executor.queued_tasks), 0, "Check test pre-condition") |
| scheduler._execute_task_instances(dagbag, session=session) |
| |
| # Only second and third |
| self.assertEqual(len(scheduler.executor.queued_tasks), 2) |
| |
| ti0 = session.query(TaskInstance)\ |
| .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t0').first() |
| self.assertEqual(ti0.state, State.SCHEDULED) |
| |
| ti1 = session.query(TaskInstance)\ |
| .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t1').first() |
| self.assertEqual(ti1.state, State.QUEUED) |
| |
| ti2 = session.query(TaskInstance)\ |
| .filter(TaskInstance.task_id == 'test_scheduler_verify_priority_and_slots_t2').first() |
| self.assertEqual(ti2.state, State.QUEUED) |
| |
| def test_scheduler_reschedule(self): |
| """ |
| Checks if tasks that are not taken up by the executor |
| get rescheduled |
| """ |
| executor = MockExecutor(do_update=False) |
| dagbag = DagBag(dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py")) |
| dagbag.dags.clear() |
| |
| dag = DAG( |
| dag_id='test_scheduler_reschedule', |
| start_date=DEFAULT_DATE) |
| dummy_task = BashOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow', |
| bash_command='echo 1', |
| ) |
| |
| dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag)) |
| dag.clear() |
| dag.is_subdag = False |
| |
| with create_session() as session: |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| orm_dag.is_paused = False |
| session.merge(orm_dag) |
| |
| dagbag.bag_dag(dag=dag, root_dag=dag) |
| |
| @mock.patch('airflow.jobs.scheduler_job.DagBag', return_value=dagbag) |
| def do_schedule(mock_dagbag): |
| # Use a empty file since the above mock will return the |
| # expected DAGs. Also specify only a single file so that it doesn't |
| # try to schedule the above DAG repeatedly. |
| with conf_vars({('core', 'mp_start_method'): 'fork'}): |
| scheduler = SchedulerJob(num_runs=1, |
| executor=executor, |
| subdir=os.path.join(settings.DAGS_FOLDER, |
| "no_dags.py")) |
| scheduler.heartrate = 0 |
| scheduler.run() |
| |
| do_schedule() # pylint: disable=no-value-for-parameter |
| with create_session() as session: |
| ti = session.query(TaskInstance).filter(TaskInstance.dag_id == dag.dag_id, |
| TaskInstance.task_id == dummy_task.task_id).first() |
| self.assertEqual(0, len(executor.queued_tasks)) |
| self.assertEqual(State.SCHEDULED, ti.state) |
| |
| executor.do_update = True |
| do_schedule() # pylint: disable=no-value-for-parameter |
| self.assertEqual(0, len(executor.queued_tasks)) |
| ti.refresh_from_db() |
| self.assertEqual(State.SUCCESS, ti.state) |
| |
| def test_retry_still_in_executor(self): |
| """ |
| Checks if the scheduler does not put a task in limbo, when a task is retried |
| but is still present in the executor. |
| """ |
| executor = MockExecutor(do_update=False) |
| dagbag = DagBag(dag_folder=os.path.join(settings.DAGS_FOLDER, "no_dags.py")) |
| dagbag.dags.clear() |
| |
| dag = DAG( |
| dag_id='test_retry_still_in_executor', |
| start_date=DEFAULT_DATE, |
| schedule_interval="@once") |
| dag_task1 = BashOperator( |
| task_id='test_retry_handling_op', |
| bash_command='exit 1', |
| retries=1, |
| dag=dag, |
| owner='airflow') |
| |
| dag.clear() |
| dag.is_subdag = False |
| |
| with create_session() as session: |
| orm_dag = DagModel(dag_id=dag.dag_id) |
| orm_dag.is_paused = False |
| session.merge(orm_dag) |
| |
| dagbag.bag_dag(dag=dag, root_dag=dag) |
| |
| @mock.patch('airflow.jobs.scheduler_job.DagBag', return_value=dagbag) |
| def do_schedule(mock_dagbag): |
| # Use a empty file since the above mock will return the |
| # expected DAGs. Also specify only a single file so that it doesn't |
| # try to schedule the above DAG repeatedly. |
| scheduler = SchedulerJob(num_runs=1, |
| executor=executor, |
| subdir=os.path.join(settings.DAGS_FOLDER, |
| "no_dags.py")) |
| scheduler.heartrate = 0 |
| scheduler.run() |
| |
| do_schedule() # pylint: disable=no-value-for-parameter |
| with create_session() as session: |
| ti = session.query(TaskInstance).filter(TaskInstance.dag_id == 'test_retry_still_in_executor', |
| TaskInstance.task_id == 'test_retry_handling_op').first() |
| ti.task = dag_task1 |
| |
| # Nothing should be left in the queued_tasks as we don't do update in MockExecutor yet, |
| # and the queued_tasks will be cleared by scheduler job. |
| self.assertEqual(0, len(executor.queued_tasks)) |
| |
| def run_with_error(ti, ignore_ti_state=False): |
| try: |
| ti.run(ignore_ti_state=ignore_ti_state) |
| except AirflowException: |
| pass |
| |
| self.assertEqual(ti.try_number, 1) |
| # At this point, scheduler has tried to schedule the task once and |
| # heartbeated the executor once, which moved the state of the task from |
| # SCHEDULED to QUEUED and then to SCHEDULED, to fail the task execution |
| # we need to ignore the TaskInstance state as SCHEDULED is not a valid state to start |
| # executing task. |
| run_with_error(ti, ignore_ti_state=True) |
| self.assertEqual(ti.state, State.UP_FOR_RETRY) |
| self.assertEqual(ti.try_number, 2) |
| |
| with create_session() as session: |
| ti.refresh_from_db(lock_for_update=True, session=session) |
| ti.state = State.SCHEDULED |
| session.merge(ti) |
| |
| # do schedule |
| do_schedule() # pylint: disable=no-value-for-parameter |
| # MockExecutor is not aware of the TaskInstance since we don't do update yet |
| # and no trace of this TaskInstance will be left in the executor. |
| self.assertFalse(executor.has_task(ti)) |
| self.assertEqual(ti.state, State.SCHEDULED) |
| |
| # To verify that task does get re-queued. |
| executor.do_update = True |
| do_schedule() # pylint: disable=no-value-for-parameter |
| ti.refresh_from_db() |
| self.assertEqual(ti.state, State.SUCCESS) |
| |
| @pytest.mark.quarantined |
| def test_retry_handling_job(self): |
| """ |
| Integration test of the scheduler not accidentally resetting |
| the try_numbers for a task |
| """ |
| dag = self.dagbag.get_dag('test_retry_handling_job') |
| dag_task1 = dag.get_task("test_retry_handling_op") |
| dag.clear() |
| |
| scheduler = SchedulerJob(dag_id=dag.dag_id, |
| num_runs=1) |
| scheduler.heartrate = 0 |
| scheduler.run() |
| |
| session = settings.Session() |
| ti = session.query(TaskInstance).filter(TaskInstance.dag_id == dag.dag_id, |
| TaskInstance.task_id == dag_task1.task_id).first() |
| |
| # make sure the counter has increased |
| self.assertEqual(ti.try_number, 2) |
| self.assertEqual(ti.state, State.UP_FOR_RETRY) |
| |
| def test_dag_with_system_exit(self): |
| """ |
| Test to check that a DAG with a system.exit() doesn't break the scheduler. |
| """ |
| |
| dag_id = 'exit_test_dag' |
| dag_ids = [dag_id] |
| dag_directory = os.path.join(settings.DAGS_FOLDER, "..", "dags_with_system_exit") |
| dag_file = os.path.join(dag_directory, 'b_test_scheduler_dags.py') |
| |
| dagbag = DagBag(dag_folder=dag_file) |
| for dag_id in dag_ids: |
| dag = dagbag.get_dag(dag_id) |
| dag.clear() |
| |
| scheduler = SchedulerJob(dag_ids=dag_ids, |
| executor=self.null_exec, |
| subdir=dag_directory, |
| num_runs=1) |
| scheduler.run() |
| with create_session() as session: |
| tis = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all() |
| # Since this dag has no end date, and there's a chance that we'll |
| # start a and finish two dag parsing processes twice in one loop! |
| self.assertGreaterEqual( |
| len(tis), 1, |
| repr(tis)) |
| |
| def test_dag_get_active_runs(self): |
| """ |
| Test to check that a DAG returns its active runs |
| """ |
| |
| now = timezone.utcnow() |
| six_hours_ago_to_the_hour = \ |
| (now - datetime.timedelta(hours=6)).replace(minute=0, second=0, microsecond=0) |
| |
| start_date = six_hours_ago_to_the_hour |
| dag_name1 = 'get_active_runs_test' |
| |
| default_args = { |
| 'owner': 'airflow', |
| 'depends_on_past': False, |
| 'start_date': start_date |
| |
| } |
| dag1 = DAG(dag_name1, |
| schedule_interval='* * * * *', |
| max_active_runs=1, |
| default_args=default_args |
| ) |
| |
| run_this_1 = DummyOperator(task_id='run_this_1', dag=dag1) |
| run_this_2 = DummyOperator(task_id='run_this_2', dag=dag1) |
| run_this_2.set_upstream(run_this_1) |
| run_this_3 = DummyOperator(task_id='run_this_3', dag=dag1) |
| run_this_3.set_upstream(run_this_2) |
| |
| session = settings.Session() |
| orm_dag = DagModel(dag_id=dag1.dag_id) |
| session.merge(orm_dag) |
| session.commit() |
| session.close() |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag1.clear() |
| |
| dr = dag_file_processor.create_dag_run(dag1) |
| |
| # We had better get a dag run |
| self.assertIsNotNone(dr) |
| |
| execution_date = dr.execution_date |
| |
| running_dates = dag1.get_active_runs() |
| |
| try: |
| running_date = running_dates[0] |
| except Exception: # pylint: disable=broad-except |
| running_date = 'Except' |
| |
| self.assertEqual(execution_date, running_date, 'Running Date must match Execution Date') |
| |
| def test_add_unparseable_file_before_sched_start_creates_import_error(self): |
| dags_folder = mkdtemp() |
| try: |
| with env_vars({('core', 'dags_folder'): dags_folder}): |
| unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME) |
| with open(unparseable_filename, 'w') as unparseable_file: |
| unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS) |
| self.run_single_scheduler_loop_with_no_dags(dags_folder) |
| finally: |
| shutil.rmtree(dags_folder) |
| |
| with create_session() as session: |
| import_errors = session.query(errors.ImportError).all() |
| |
| self.assertEqual(len(import_errors), 1) |
| import_error = import_errors[0] |
| self.assertEqual(import_error.filename, |
| unparseable_filename) |
| self.assertEqual(import_error.stacktrace, |
| "invalid syntax ({}, line 1)".format(TEMP_DAG_FILENAME)) |
| |
| def test_add_unparseable_file_after_sched_start_creates_import_error(self): |
| dags_folder = mkdtemp() |
| try: |
| unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME) |
| self.run_single_scheduler_loop_with_no_dags(dags_folder) |
| |
| with open(unparseable_filename, 'w') as unparseable_file: |
| unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS) |
| self.run_single_scheduler_loop_with_no_dags(dags_folder) |
| finally: |
| shutil.rmtree(dags_folder) |
| |
| with create_session() as session: |
| import_errors = session.query(errors.ImportError).all() |
| |
| self.assertEqual(len(import_errors), 1) |
| import_error = import_errors[0] |
| self.assertEqual(import_error.filename, |
| unparseable_filename) |
| self.assertEqual(import_error.stacktrace, |
| "invalid syntax ({}, line 1)".format(TEMP_DAG_FILENAME)) |
| |
| def test_no_import_errors_with_parseable_dag(self): |
| try: |
| dags_folder = mkdtemp() |
| parseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME) |
| |
| with open(parseable_filename, 'w') as parseable_file: |
| parseable_file.writelines(PARSEABLE_DAG_FILE_CONTENTS) |
| self.run_single_scheduler_loop_with_no_dags(dags_folder) |
| finally: |
| shutil.rmtree(dags_folder) |
| |
| with create_session() as session: |
| import_errors = session.query(errors.ImportError).all() |
| |
| self.assertEqual(len(import_errors), 0) |
| |
| def test_new_import_error_replaces_old(self): |
| try: |
| dags_folder = mkdtemp() |
| unparseable_filename = os.path.join(dags_folder, TEMP_DAG_FILENAME) |
| |
| # Generate original import error |
| with open(unparseable_filename, 'w') as unparseable_file: |
| unparseable_file.writelines(UNPARSEABLE_DAG_FILE_CONTENTS) |
| self.run_single_scheduler_loop_with_no_dags(dags_folder) |
| |
| # Generate replacement import error (the error will be on the second line now) |
| with open(unparseable_filename, 'w') as unparseable_file: |
| unparseable_file.writelines( |
| PARSEABLE_DAG_FILE_CONTENTS + |
| os.linesep + |
| UNPARSEABLE_DAG_FILE_CONTENTS) |
| self.run_single_scheduler_loop_with_no_dags(dags_folder) |
| finally: |
| shutil.rmtree(dags_folder) |
| |
| session = settings.Session() |
| import_errors = session.query(errors.ImportError).all() |
| |
| self.assertEqual(len(import_errors), 1) |
| import_error = import_errors[0] |
| self.assertEqual(import_error.filename, |
| unparseable_filename) |
| self.assertEqual(import_error.stacktrace, |
| "invalid syntax ({}, line 2)".format(TEMP_DAG_FILENAME)) |
| |
| def test_remove_error_clears_import_error(self): |
| try: |
| dags_folder = mkdtemp() |
| filename_to_parse = os.path.join(dags_folder, TEMP_DAG_FILENAME) |
| |
| # Generate original import error |
| with open(filename_to_parse, 'w') as file_to_parse: |
| file_to_parse.writelines(UNPARSEABLE_DAG_FILE_CONTENTS) |
| self.run_single_scheduler_loop_with_no_dags(dags_folder) |
| |
| # Remove the import error from the file |
| with open(filename_to_parse, 'w') as file_to_parse: |
| file_to_parse.writelines( |
| PARSEABLE_DAG_FILE_CONTENTS) |
| self.run_single_scheduler_loop_with_no_dags(dags_folder) |
| finally: |
| shutil.rmtree(dags_folder) |
| |
| session = settings.Session() |
| import_errors = session.query(errors.ImportError).all() |
| |
| self.assertEqual(len(import_errors), 0) |
| |
| def test_remove_file_clears_import_error(self): |
| try: |
| dags_folder = mkdtemp() |
| filename_to_parse = os.path.join(dags_folder, TEMP_DAG_FILENAME) |
| |
| # Generate original import error |
| with open(filename_to_parse, 'w') as file_to_parse: |
| file_to_parse.writelines(UNPARSEABLE_DAG_FILE_CONTENTS) |
| self.run_single_scheduler_loop_with_no_dags(dags_folder) |
| finally: |
| shutil.rmtree(dags_folder) |
| |
| # Rerun the scheduler once the dag file has been removed |
| self.run_single_scheduler_loop_with_no_dags(dags_folder) |
| |
| with create_session() as session: |
| import_errors = session.query(errors.ImportError).all() |
| |
| self.assertEqual(len(import_errors), 0) |
| |
| def test_list_py_file_paths(self): |
| """ |
| [JIRA-1357] Test the 'list_py_file_paths' function used by the |
| scheduler to list and load DAGs. |
| """ |
| detected_files = set() |
| expected_files = set() |
| # No_dags is empty, _invalid_ is ignored by .airflowignore |
| ignored_files = { |
| 'no_dags.py', |
| 'test_invalid_cron.py', |
| 'test_zip_invalid_cron.zip', |
| 'test_ignore_this.py', |
| } |
| for root, _, files in os.walk(TEST_DAG_FOLDER): # pylint: disable=too-many-nested-blocks |
| for file_name in files: |
| if file_name.endswith('.py') or file_name.endswith('.zip'): |
| if file_name not in ignored_files: |
| expected_files.add( |
| '{}/{}'.format(root, file_name)) |
| for file_path in list_py_file_paths(TEST_DAG_FOLDER, include_examples=False): |
| detected_files.add(file_path) |
| self.assertEqual(detected_files, expected_files) |
| |
| ignored_files = { |
| 'helper.py', |
| } |
| example_dag_folder = airflow.example_dags.__path__[0] |
| for root, _, files in os.walk(example_dag_folder): # pylint: disable=too-many-nested-blocks |
| for file_name in files: |
| if file_name.endswith('.py') or file_name.endswith('.zip'): |
| if file_name not in ['__init__.py'] and file_name not in ignored_files: |
| expected_files.add(os.path.join(root, file_name)) |
| detected_files.clear() |
| for file_path in list_py_file_paths(TEST_DAG_FOLDER, include_examples=True): |
| detected_files.add(file_path) |
| self.assertEqual(detected_files, expected_files) |
| |
| smart_sensor_dag_folder = airflow.smart_sensor_dags.__path__[0] |
| for root, _, files in os.walk(smart_sensor_dag_folder): |
| for file_name in files: |
| if (file_name.endswith('.py') or file_name.endswith('.zip')) and \ |
| file_name not in ['__init__.py']: |
| expected_files.add(os.path.join(root, file_name)) |
| detected_files.clear() |
| for file_path in list_py_file_paths(TEST_DAG_FOLDER, |
| include_examples=True, |
| include_smart_sensor=True): |
| detected_files.add(file_path) |
| self.assertEqual(detected_files, expected_files) |
| |
| def test_adopt_or_reset_orphaned_tasks_nothing(self): |
| """Try with nothing. """ |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| self.assertEqual(0, scheduler.adopt_or_reset_orphaned_tasks(session=session)) |
| |
| def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self): |
| dag_id = 'test_reset_orphaned_tasks_external_triggered_dag' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') |
| task_id = dag_id + '_task' |
| DummyOperator(task_id=task_id, dag=dag) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| dr1 = dag_file_processor.create_dag_run(dag, session=session) |
| ti = dr1.get_task_instances(session=session)[0] |
| dr1.state = State.RUNNING |
| ti.state = State.SCHEDULED |
| dr1.external_trigger = True |
| session.merge(ti) |
| session.merge(dr1) |
| session.commit() |
| |
| num_reset_tis = scheduler.adopt_or_reset_orphaned_tasks(session=session) |
| self.assertEqual(1, num_reset_tis) |
| |
| def test_adopt_or_reset_orphaned_tasks_backfill_dag(self): |
| dag_id = 'test_adopt_or_reset_orphaned_tasks_backfill_dag' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') |
| task_id = dag_id + '_task' |
| DummyOperator(task_id=task_id, dag=dag) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| session.add(scheduler) |
| session.flush() |
| |
| dr1 = dag_file_processor.create_dag_run(dag, session=session) |
| ti = dr1.get_task_instances(session=session)[0] |
| ti.state = State.SCHEDULED |
| dr1.state = State.RUNNING |
| dr1.run_type = DagRunType.BACKFILL_JOB.value |
| session.merge(ti) |
| session.merge(dr1) |
| session.flush() |
| |
| self.assertTrue(dr1.is_backfill) |
| self.assertEqual(0, scheduler.adopt_or_reset_orphaned_tasks(session=session)) |
| session.rollback() |
| |
| def test_reset_orphaned_tasks_nonexistent_dagrun(self): |
| """Make sure a task in an orphaned state is not reset if it has no dagrun. """ |
| dag_id = 'test_reset_orphaned_tasks_nonexistent_dagrun' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') |
| task_id = dag_id + '_task' |
| task = DummyOperator(task_id=task_id, dag=dag) |
| |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| |
| ti = TaskInstance(task=task, execution_date=DEFAULT_DATE) |
| session.add(ti) |
| session.flush() |
| |
| ti.refresh_from_db() |
| ti.state = State.SCHEDULED |
| session.merge(ti) |
| session.flush() |
| |
| self.assertEqual(0, scheduler.adopt_or_reset_orphaned_tasks(session=session)) |
| session.rollback() |
| |
| def test_reset_orphaned_tasks_no_orphans(self): |
| dag_id = 'test_reset_orphaned_tasks_no_orphans' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') |
| task_id = dag_id + '_task' |
| DummyOperator(task_id=task_id, dag=dag) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| session.add(scheduler) |
| session.flush() |
| |
| dr1 = dag_file_processor.create_dag_run(dag, session=session) |
| dr1.state = State.RUNNING |
| tis = dr1.get_task_instances(session=session) |
| tis[0].state = State.RUNNING |
| tis[0].queued_by_job_id = scheduler.id |
| session.merge(dr1) |
| session.merge(tis[0]) |
| session.flush() |
| |
| self.assertEqual(0, scheduler.adopt_or_reset_orphaned_tasks(session=session)) |
| tis[0].refresh_from_db() |
| self.assertEqual(State.RUNNING, tis[0].state) |
| |
| def test_reset_orphaned_tasks_non_running_dagruns(self): |
| """Ensure orphaned tasks with non-running dagruns are not reset.""" |
| dag_id = 'test_reset_orphaned_tasks_non_running_dagruns' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') |
| task_id = dag_id + '_task' |
| DummyOperator(task_id=task_id, dag=dag) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler = SchedulerJob() |
| session = settings.Session() |
| session.add(scheduler) |
| session.flush() |
| |
| dr1 = dag_file_processor.create_dag_run(dag, session=session) |
| dr1.state = State.SUCCESS |
| tis = dr1.get_task_instances(session=session) |
| self.assertEqual(1, len(tis)) |
| tis[0].state = State.SCHEDULED |
| tis[0].queued_by_job_id = scheduler.id |
| session.merge(dr1) |
| session.merge(tis[0]) |
| session.flush() |
| |
| self.assertEqual(0, scheduler.adopt_or_reset_orphaned_tasks(session=session)) |
| session.rollback() |
| |
| def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self): |
| dag_id = 'test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') |
| DummyOperator(task_id='task1', dag=dag) |
| DummyOperator(task_id='task2', dag=dag) |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| scheduler_job = SchedulerJob() |
| session = settings.Session() |
| scheduler_job.state = State.RUNNING |
| scheduler_job.latest_heartbeat = timezone.utcnow() |
| session.add(scheduler_job) |
| |
| old_job = SchedulerJob() |
| old_job.state = State.RUNNING |
| old_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15) |
| session.add(old_job) |
| session.flush() |
| |
| dr1 = dag_file_processor.create_dag_run(dag, session=session) |
| ti1, ti2 = dr1.get_task_instances(session=session) |
| dr1.state = State.RUNNING |
| ti1.state = State.SCHEDULED |
| ti1.queued_by_job_id = old_job.id |
| session.merge(dr1) |
| session.merge(ti1) |
| |
| ti2.state = State.SCHEDULED |
| ti2.queued_by_job_id = scheduler_job.id |
| session.merge(ti2) |
| session.flush() |
| |
| num_reset_tis = scheduler_job.adopt_or_reset_orphaned_tasks(session=session) |
| session.flush() |
| self.assertEqual(1, num_reset_tis) |
| |
| session.refresh(ti1) |
| self.assertEqual(None, ti1.state) |
| session.refresh(ti2) |
| self.assertEqual(State.SCHEDULED, ti2.state) |
| session.rollback() |
| |
| |
| def test_task_with_upstream_skip_process_task_instances(): |
| """ |
| Test if _process_task_instances puts a task instance into SKIPPED state if any of its |
| upstream tasks are skipped according to TriggerRuleDep. |
| """ |
| clear_db_runs() |
| with DAG( |
| dag_id='test_task_with_upstream_skip_dag', |
| start_date=DEFAULT_DATE, |
| schedule_interval=None |
| ) as dag: |
| dummy1 = DummyOperator(task_id='dummy1') |
| dummy2 = DummyOperator(task_id="dummy2") |
| dummy3 = DummyOperator(task_id="dummy3") |
| [dummy1, dummy2] >> dummy3 |
| |
| dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock()) |
| dag.clear() |
| dr = dag.create_dagrun(run_type=DagRunType.MANUAL, |
| state=State.RUNNING, |
| execution_date=DEFAULT_DATE) |
| assert dr is not None |
| |
| with create_session() as session: |
| tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)} |
| # Set dummy1 to skipped and dummy2 to success. dummy3 remains as none. |
| tis[dummy1.task_id].state = State.SKIPPED |
| tis[dummy2.task_id].state = State.SUCCESS |
| assert tis[dummy3.task_id].state == State.NONE |
| |
| dag_runs = DagRun.find(dag_id='test_task_with_upstream_skip_dag') |
| dag_file_processor._process_task_instances(dag, dag_runs=dag_runs) |
| |
| with create_session() as session: |
| tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)} |
| assert tis[dummy1.task_id].state == State.SKIPPED |
| assert tis[dummy2.task_id].state == State.SUCCESS |
| # dummy3 should be skipped because dummy1 is skipped. |
| assert tis[dummy3.task_id].state == State.SKIPPED |
| |
| |
| class TestSchedulerJobQueriesCount(unittest.TestCase): |
| """ |
| These tests are designed to detect changes in the number of queries for |
| different DAG files. These tests allow easy detection when a change is |
| made that affects the performance of the SchedulerJob. |
| """ |
| |
| def setUp(self) -> None: |
| clear_db_runs() |
| clear_db_pools() |
| clear_db_dags() |
| clear_db_sla_miss() |
| clear_db_errors() |
| |
| @parameterized.expand( |
| [ |
| # pylint: disable=bad-whitespace |
| # expected, dag_count, task_count |
| # One DAG with one task per DAG file |
| (13, 1, 1), # noqa |
| # One DAG with five tasks per DAG file |
| (17, 1, 5), # noqa |
| # 10 DAGs with 10 tasks per DAG file |
| (46, 10, 10), # noqa |
| ] |
| ) |
| def test_execute_queries_count_with_harvested_dags(self, expected_query_count, dag_count, task_count): |
| with mock.patch.dict("os.environ", { |
| "PERF_DAGS_COUNT": str(dag_count), |
| "PERF_TASKS_COUNT": str(task_count), |
| "PERF_START_AGO": "1d", |
| "PERF_SCHEDULE_INTERVAL": "30m", |
| "PERF_SHAPE": "no_structure", |
| }), conf_vars({ |
| ('scheduler', 'use_job_schedule'): 'True', |
| ('core', 'load_examples'): 'False', |
| }): |
| |
| dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False) |
| for i, dag in enumerate(dagbag.dags.values()): |
| dr = dag.create_dagrun(state=State.RUNNING, run_id=f"{DagRunType.MANUAL.value}__{i}") |
| for ti in dr.get_task_instances(): |
| ti.set_state(state=State.SCHEDULED) |
| |
| mock_agent = mock.MagicMock() |
| mock_agent.harvest_serialized_dags.return_value = [ |
| SerializedDAG.from_dict(SerializedDAG.to_dict(d)) for d in dagbag.dags.values()] |
| |
| job = SchedulerJob(subdir=PERF_DAGS_FOLDER) |
| job.executor = MockExecutor() |
| job.heartbeat = mock.MagicMock() |
| job.processor_agent = mock_agent |
| |
| with assert_queries_count(expected_query_count): |
| job._run_scheduler_loop() |
| |
| @parameterized.expand( |
| [ |
| # pylint: disable=bad-whitespace |
| # expected, dag_count, task_count |
| # One DAG with one task per DAG file |
| (2, 1, 1), # noqa |
| # One DAG with five tasks per DAG file |
| (2, 1, 5), # noqa |
| # 10 DAGs with 10 tasks per DAG file |
| (2, 10, 10), # noqa |
| ] |
| ) |
| def test_execute_queries_count_no_harvested_dags(self, expected_query_count, dag_count, task_count): |
| with mock.patch.dict("os.environ", { |
| "PERF_DAGS_COUNT": str(dag_count), |
| "PERF_TASKS_COUNT": str(task_count), |
| "PERF_START_AGO": "1d", |
| "PERF_SCHEDULE_INTERVAL": "30m", |
| "PERF_SHAPE": "no_structure", |
| }), conf_vars({ |
| ('scheduler', 'use_job_schedule'): 'True', |
| ('core', 'load_examples'): 'False', |
| }): |
| |
| dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False) |
| for i, dag in enumerate(dagbag.dags.values()): |
| dr = dag.create_dagrun(state=State.RUNNING, run_id=f"{DagRunType.MANUAL.value}__{i}") |
| for ti in dr.get_task_instances(): |
| ti.set_state(state=State.SCHEDULED) |
| |
| mock_agent = mock.MagicMock() |
| mock_agent.harvest_serialized_dags.return_value = [] |
| |
| job = SchedulerJob(subdir=PERF_DAGS_FOLDER) |
| job.executor = MockExecutor() |
| job.heartbeat = mock.MagicMock() |
| job.processor_agent = mock_agent |
| |
| with assert_queries_count(expected_query_count): |
| job._run_scheduler_loop() |