| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import datetime |
| import json |
| import logging |
| import threading |
| import unittest |
| from unittest.mock import patch |
| |
| import pytest |
| import sqlalchemy |
| from parameterized import parameterized |
| |
| from airflow import settings |
| from airflow.cli import cli_parser |
| from airflow.exceptions import ( |
| AirflowException, |
| AirflowTaskTimeout, |
| DagConcurrencyLimitReached, |
| NoAvailablePoolSlot, |
| TaskConcurrencyLimitReached, |
| ) |
| from airflow.jobs.backfill_job import BackfillJob |
| from airflow.models import DAG, DagBag, Pool, TaskInstance as TI |
| from airflow.models.dagrun import DagRun |
| from airflow.operators.dummy import DummyOperator |
| from airflow.utils import timezone |
| from airflow.utils.session import create_session |
| from airflow.utils.state import State |
| from airflow.utils.timeout import timeout |
| from airflow.utils.types import DagRunType |
| from tests.test_utils.db import clear_db_pools, clear_db_runs, set_default_pool_slots |
| from tests.test_utils.mock_executor import MockExecutor |
| |
| logger = logging.getLogger(__name__) |
| |
| DEFAULT_DATE = timezone.datetime(2016, 1, 1) |
| |
| |
| @pytest.mark.heisentests |
| class TestBackfillJob(unittest.TestCase): |
| def _get_dummy_dag(self, dag_id, pool=Pool.DEFAULT_POOL_NAME, task_concurrency=None): |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') |
| |
| with dag: |
| DummyOperator(task_id='op', pool=pool, task_concurrency=task_concurrency, dag=dag) |
| |
| dag.clear() |
| return dag |
| |
| def _times_called_with(self, method, class_): |
| count = 0 |
| for args in method.call_args_list: |
| if isinstance(args[0][0], class_): |
| count += 1 |
| return count |
| |
| @classmethod |
| def setUpClass(cls): |
| cls.dagbag = DagBag(include_examples=True) |
| |
| @staticmethod |
| def clean_db(): |
| clear_db_runs() |
| clear_db_pools() |
| |
| def setUp(self): |
| self.clean_db() |
| self.parser = cli_parser.get_parser() |
| |
| def tearDown(self) -> None: |
| self.clean_db() |
| |
| def test_unfinished_dag_runs_set_to_failed(self): |
| dag = self._get_dummy_dag('dummy_dag') |
| |
| dag_run = dag.create_dagrun( |
| run_id='test', |
| state=State.RUNNING, |
| ) |
| |
| job = BackfillJob( |
| dag=dag, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=8), |
| ignore_first_depends_on_past=True, |
| ) |
| |
| job._set_unfinished_dag_runs_to_failed([dag_run]) |
| |
| dag_run.refresh_from_db() |
| |
| self.assertEqual(State.FAILED, dag_run.state) |
| |
| def test_dag_run_with_finished_tasks_set_to_success(self): |
| dag = self._get_dummy_dag('dummy_dag') |
| |
| dag_run = dag.create_dagrun( |
| run_id='test', |
| state=State.RUNNING, |
| ) |
| |
| for ti in dag_run.get_task_instances(): |
| ti.set_state(State.SUCCESS) |
| |
| job = BackfillJob( |
| dag=dag, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=8), |
| ignore_first_depends_on_past=True, |
| ) |
| |
| job._set_unfinished_dag_runs_to_failed([dag_run]) |
| |
| dag_run.refresh_from_db() |
| |
| self.assertEqual(State.SUCCESS, dag_run.state) |
| |
| @pytest.mark.xfail(condition=True, reason="This test is flaky") |
| @pytest.mark.backend("postgres", "mysql") |
| def test_trigger_controller_dag(self): |
| dag = self.dagbag.get_dag('example_trigger_controller_dag') |
| target_dag = self.dagbag.get_dag('example_trigger_target_dag') |
| target_dag.sync_to_db() |
| |
| # dag_file_processor = DagFileProcessor(dag_ids=[], log=Mock()) |
| task_instances_list = [] |
| # task_instances_list = dag_file_processor._process_task_instances( |
| # target_dag, |
| # dag_runs=DagRun.find(dag_id='example_trigger_target_dag') |
| # ) |
| self.assertFalse(task_instances_list) |
| |
| job = BackfillJob( |
| dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_first_depends_on_past=True |
| ) |
| job.run() |
| |
| task_instances_list = [] |
| # task_instances_list = dag_file_processor._process_task_instances( |
| # target_dag, |
| # dag_runs=DagRun.find(dag_id='example_trigger_target_dag') |
| # ) |
| |
| self.assertTrue(task_instances_list) |
| |
| @pytest.mark.backend("postgres", "mysql") |
| def test_backfill_multi_dates(self): |
| dag = self.dagbag.get_dag('example_bash_operator') |
| |
| end_date = DEFAULT_DATE + datetime.timedelta(days=1) |
| |
| executor = MockExecutor(parallelism=16) |
| job = BackfillJob( |
| dag=dag, |
| start_date=DEFAULT_DATE, |
| end_date=end_date, |
| executor=executor, |
| ignore_first_depends_on_past=True, |
| ) |
| |
| job.run() |
| |
| expected_execution_order = [ |
| ("runme_0", DEFAULT_DATE), |
| ("runme_1", DEFAULT_DATE), |
| ("runme_2", DEFAULT_DATE), |
| ("runme_0", end_date), |
| ("runme_1", end_date), |
| ("runme_2", end_date), |
| ("also_run_this", DEFAULT_DATE), |
| ("also_run_this", end_date), |
| ("run_after_loop", DEFAULT_DATE), |
| ("run_after_loop", end_date), |
| ("run_this_last", DEFAULT_DATE), |
| ("run_this_last", end_date), |
| ] |
| self.assertListEqual( |
| [ |
| ((dag.dag_id, task_id, when, 1), (State.SUCCESS, None)) |
| for (task_id, when) in expected_execution_order |
| ], |
| executor.sorted_tasks, |
| ) |
| |
| session = settings.Session() |
| drs = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.execution_date).all() |
| |
| self.assertTrue(drs[0].execution_date == DEFAULT_DATE) |
| self.assertTrue(drs[0].state == State.SUCCESS) |
| self.assertTrue(drs[1].execution_date == DEFAULT_DATE + datetime.timedelta(days=1)) |
| self.assertTrue(drs[1].state == State.SUCCESS) |
| |
| dag.clear() |
| session.close() |
| |
| @pytest.mark.backend("postgres", "mysql") |
| @parameterized.expand( |
| [ |
| [ |
| "example_branch_operator", |
| ( |
| "run_this_first", |
| "branching", |
| "branch_a", |
| "branch_b", |
| "branch_c", |
| "branch_d", |
| "follow_branch_a", |
| "follow_branch_b", |
| "follow_branch_c", |
| "follow_branch_d", |
| "join", |
| ), |
| ], |
| [ |
| "example_bash_operator", |
| ("runme_0", "runme_1", "runme_2", "also_run_this", "run_after_loop", "run_this_last"), |
| ], |
| [ |
| "example_skip_dag", |
| ( |
| "always_true_1", |
| "always_true_2", |
| "skip_operator_1", |
| "skip_operator_2", |
| "all_success", |
| "one_success", |
| "final_1", |
| "final_2", |
| ), |
| ], |
| ["latest_only", ("latest_only", "task1")], |
| ] |
| ) |
| def test_backfill_examples(self, dag_id, expected_execution_order): |
| """ |
| Test backfilling example dags |
| |
| Try to backfill some of the example dags. Be careful, not all dags are suitable |
| for doing this. For example, a dag that sleeps forever, or does not have a |
| schedule won't work here since you simply can't backfill them. |
| """ |
| dag = self.dagbag.get_dag(dag_id) |
| |
| logger.info('*** Running example DAG: %s', dag.dag_id) |
| executor = MockExecutor() |
| job = BackfillJob( |
| dag=dag, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE, |
| executor=executor, |
| ignore_first_depends_on_past=True, |
| ) |
| |
| job.run() |
| self.assertListEqual( |
| [ |
| ((dag_id, task_id, DEFAULT_DATE, 1), (State.SUCCESS, None)) |
| for task_id in expected_execution_order |
| ], |
| executor.sorted_tasks, |
| ) |
| |
| def test_backfill_conf(self): |
| dag = self._get_dummy_dag('test_backfill_conf') |
| |
| executor = MockExecutor() |
| |
| conf_ = json.loads("""{"key": "value"}""") |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=2), |
| conf=conf_, |
| ) |
| job.run() |
| |
| dr = DagRun.find(dag_id='test_backfill_conf') |
| |
| self.assertEqual(conf_, dr[0].conf) |
| |
| @patch('airflow.jobs.backfill_job.BackfillJob.log') |
| def test_backfill_respect_task_concurrency_limit(self, mock_log): |
| task_concurrency = 2 |
| dag = self._get_dummy_dag( |
| 'test_backfill_respect_task_concurrency_limit', |
| task_concurrency=task_concurrency, |
| ) |
| |
| executor = MockExecutor() |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=7), |
| ) |
| |
| job.run() |
| |
| self.assertGreater(len(executor.history), 0) |
| |
| task_concurrency_limit_reached_at_least_once = False |
| |
| num_running_task_instances = 0 |
| for running_task_instances in executor.history: |
| self.assertLessEqual(len(running_task_instances), task_concurrency) |
| num_running_task_instances += len(running_task_instances) |
| if len(running_task_instances) == task_concurrency: |
| task_concurrency_limit_reached_at_least_once = True |
| |
| self.assertEqual(8, num_running_task_instances) |
| self.assertTrue(task_concurrency_limit_reached_at_least_once) |
| |
| times_dag_concurrency_limit_reached_in_debug = self._times_called_with( |
| mock_log.debug, |
| DagConcurrencyLimitReached, |
| ) |
| |
| times_pool_limit_reached_in_debug = self._times_called_with( |
| mock_log.debug, |
| NoAvailablePoolSlot, |
| ) |
| |
| times_task_concurrency_limit_reached_in_debug = self._times_called_with( |
| mock_log.debug, |
| TaskConcurrencyLimitReached, |
| ) |
| |
| self.assertEqual(0, times_pool_limit_reached_in_debug) |
| self.assertEqual(0, times_dag_concurrency_limit_reached_in_debug) |
| self.assertGreater(times_task_concurrency_limit_reached_in_debug, 0) |
| |
| @patch('airflow.jobs.backfill_job.BackfillJob.log') |
| def test_backfill_respect_dag_concurrency_limit(self, mock_log): |
| |
| dag = self._get_dummy_dag('test_backfill_respect_concurrency_limit') |
| dag.concurrency = 2 |
| |
| executor = MockExecutor() |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=7), |
| ) |
| |
| job.run() |
| |
| self.assertGreater(len(executor.history), 0) |
| |
| concurrency_limit_reached_at_least_once = False |
| |
| num_running_task_instances = 0 |
| |
| for running_task_instances in executor.history: |
| self.assertLessEqual(len(running_task_instances), dag.concurrency) |
| num_running_task_instances += len(running_task_instances) |
| if len(running_task_instances) == dag.concurrency: |
| concurrency_limit_reached_at_least_once = True |
| |
| self.assertEqual(8, num_running_task_instances) |
| self.assertTrue(concurrency_limit_reached_at_least_once) |
| |
| times_dag_concurrency_limit_reached_in_debug = self._times_called_with( |
| mock_log.debug, |
| DagConcurrencyLimitReached, |
| ) |
| |
| times_pool_limit_reached_in_debug = self._times_called_with( |
| mock_log.debug, |
| NoAvailablePoolSlot, |
| ) |
| |
| times_task_concurrency_limit_reached_in_debug = self._times_called_with( |
| mock_log.debug, |
| TaskConcurrencyLimitReached, |
| ) |
| |
| self.assertEqual(0, times_pool_limit_reached_in_debug) |
| self.assertEqual(0, times_task_concurrency_limit_reached_in_debug) |
| self.assertGreater(times_dag_concurrency_limit_reached_in_debug, 0) |
| |
| @patch('airflow.jobs.backfill_job.BackfillJob.log') |
| def test_backfill_respect_default_pool_limit(self, mock_log): |
| default_pool_slots = 2 |
| set_default_pool_slots(default_pool_slots) |
| |
| dag = self._get_dummy_dag('test_backfill_with_no_pool_limit') |
| |
| executor = MockExecutor() |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=7), |
| ) |
| |
| job.run() |
| |
| self.assertGreater(len(executor.history), 0) |
| |
| default_pool_task_slot_count_reached_at_least_once = False |
| |
| num_running_task_instances = 0 |
| |
| # if no pool is specified, the number of tasks running in |
| # parallel per backfill should be less than |
| # default_pool slots at any point of time. |
| for running_task_instances in executor.history: |
| self.assertLessEqual( |
| len(running_task_instances), |
| default_pool_slots, |
| ) |
| num_running_task_instances += len(running_task_instances) |
| if len(running_task_instances) == default_pool_slots: |
| default_pool_task_slot_count_reached_at_least_once = True |
| |
| self.assertEqual(8, num_running_task_instances) |
| self.assertTrue(default_pool_task_slot_count_reached_at_least_once) |
| |
| times_dag_concurrency_limit_reached_in_debug = self._times_called_with( |
| mock_log.debug, |
| DagConcurrencyLimitReached, |
| ) |
| |
| times_pool_limit_reached_in_debug = self._times_called_with( |
| mock_log.debug, |
| NoAvailablePoolSlot, |
| ) |
| |
| times_task_concurrency_limit_reached_in_debug = self._times_called_with( |
| mock_log.debug, |
| TaskConcurrencyLimitReached, |
| ) |
| |
| self.assertEqual(0, times_dag_concurrency_limit_reached_in_debug) |
| self.assertEqual(0, times_task_concurrency_limit_reached_in_debug) |
| self.assertGreater(times_pool_limit_reached_in_debug, 0) |
| |
| def test_backfill_pool_not_found(self): |
| dag = self._get_dummy_dag( |
| dag_id='test_backfill_pool_not_found', |
| pool='king_pool', |
| ) |
| |
| executor = MockExecutor() |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=7), |
| ) |
| |
| try: |
| job.run() |
| except AirflowException: |
| return |
| |
| self.fail() |
| |
| @patch('airflow.jobs.backfill_job.BackfillJob.log') |
| def test_backfill_respect_pool_limit(self, mock_log): |
| session = settings.Session() |
| |
| slots = 2 |
| pool = Pool( |
| pool='pool_with_two_slots', |
| slots=slots, |
| ) |
| session.add(pool) |
| session.commit() |
| |
| dag = self._get_dummy_dag( |
| dag_id='test_backfill_respect_pool_limit', |
| pool=pool.pool, |
| ) |
| |
| executor = MockExecutor() |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=7), |
| ) |
| |
| job.run() |
| |
| self.assertGreater(len(executor.history), 0) |
| |
| pool_was_full_at_least_once = False |
| num_running_task_instances = 0 |
| |
| for running_task_instances in executor.history: |
| self.assertLessEqual(len(running_task_instances), slots) |
| num_running_task_instances += len(running_task_instances) |
| if len(running_task_instances) == slots: |
| pool_was_full_at_least_once = True |
| |
| self.assertEqual(8, num_running_task_instances) |
| self.assertTrue(pool_was_full_at_least_once) |
| |
| times_dag_concurrency_limit_reached_in_debug = self._times_called_with( |
| mock_log.debug, |
| DagConcurrencyLimitReached, |
| ) |
| |
| times_pool_limit_reached_in_debug = self._times_called_with( |
| mock_log.debug, |
| NoAvailablePoolSlot, |
| ) |
| |
| times_task_concurrency_limit_reached_in_debug = self._times_called_with( |
| mock_log.debug, |
| TaskConcurrencyLimitReached, |
| ) |
| |
| self.assertEqual(0, times_task_concurrency_limit_reached_in_debug) |
| self.assertEqual(0, times_dag_concurrency_limit_reached_in_debug) |
| self.assertGreater(times_pool_limit_reached_in_debug, 0) |
| |
| def test_backfill_run_rescheduled(self): |
| dag = DAG(dag_id='test_backfill_run_rescheduled', start_date=DEFAULT_DATE, schedule_interval='@daily') |
| |
| with dag: |
| DummyOperator( |
| task_id='test_backfill_run_rescheduled_task-1', |
| dag=dag, |
| ) |
| |
| dag.clear() |
| |
| executor = MockExecutor() |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=2), |
| ) |
| job.run() |
| |
| ti = TI(task=dag.get_task('test_backfill_run_rescheduled_task-1'), execution_date=DEFAULT_DATE) |
| ti.refresh_from_db() |
| ti.set_state(State.UP_FOR_RESCHEDULE) |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=2), |
| rerun_failed_tasks=True, |
| ) |
| job.run() |
| ti = TI(task=dag.get_task('test_backfill_run_rescheduled_task-1'), execution_date=DEFAULT_DATE) |
| ti.refresh_from_db() |
| self.assertEqual(ti.state, State.SUCCESS) |
| |
| def test_backfill_rerun_failed_tasks(self): |
| dag = DAG(dag_id='test_backfill_rerun_failed', start_date=DEFAULT_DATE, schedule_interval='@daily') |
| |
| with dag: |
| DummyOperator(task_id='test_backfill_rerun_failed_task-1', dag=dag) |
| |
| dag.clear() |
| |
| executor = MockExecutor() |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=2), |
| ) |
| job.run() |
| |
| ti = TI(task=dag.get_task('test_backfill_rerun_failed_task-1'), execution_date=DEFAULT_DATE) |
| ti.refresh_from_db() |
| ti.set_state(State.FAILED) |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=2), |
| rerun_failed_tasks=True, |
| ) |
| job.run() |
| ti = TI(task=dag.get_task('test_backfill_rerun_failed_task-1'), execution_date=DEFAULT_DATE) |
| ti.refresh_from_db() |
| self.assertEqual(ti.state, State.SUCCESS) |
| |
| def test_backfill_rerun_upstream_failed_tasks(self): |
| dag = DAG( |
| dag_id='test_backfill_rerun_upstream_failed', start_date=DEFAULT_DATE, schedule_interval='@daily' |
| ) |
| |
| with dag: |
| op1 = DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-1', dag=dag) |
| op2 = DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-2', dag=dag) |
| op1.set_upstream(op2) |
| |
| dag.clear() |
| executor = MockExecutor() |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=2), |
| ) |
| job.run() |
| |
| ti = TI(task=dag.get_task('test_backfill_rerun_upstream_failed_task-1'), execution_date=DEFAULT_DATE) |
| ti.refresh_from_db() |
| ti.set_state(State.UPSTREAM_FAILED) |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=2), |
| rerun_failed_tasks=True, |
| ) |
| job.run() |
| ti = TI(task=dag.get_task('test_backfill_rerun_upstream_failed_task-1'), execution_date=DEFAULT_DATE) |
| ti.refresh_from_db() |
| self.assertEqual(ti.state, State.SUCCESS) |
| |
| def test_backfill_rerun_failed_tasks_without_flag(self): |
| dag = DAG(dag_id='test_backfill_rerun_failed', start_date=DEFAULT_DATE, schedule_interval='@daily') |
| |
| with dag: |
| DummyOperator(task_id='test_backfill_rerun_failed_task-1', dag=dag) |
| |
| dag.clear() |
| |
| executor = MockExecutor() |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=2), |
| ) |
| job.run() |
| |
| ti = TI(task=dag.get_task('test_backfill_rerun_failed_task-1'), execution_date=DEFAULT_DATE) |
| ti.refresh_from_db() |
| ti.set_state(State.FAILED) |
| |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=2), |
| rerun_failed_tasks=False, |
| ) |
| |
| with self.assertRaises(AirflowException): |
| job.run() |
| |
| def test_backfill_ordered_concurrent_execute(self): |
| dag = DAG( |
| dag_id='test_backfill_ordered_concurrent_execute', |
| start_date=DEFAULT_DATE, |
| schedule_interval="@daily", |
| ) |
| |
| with dag: |
| op1 = DummyOperator(task_id='leave1') |
| op2 = DummyOperator(task_id='leave2') |
| op3 = DummyOperator(task_id='upstream_level_1') |
| op4 = DummyOperator(task_id='upstream_level_2') |
| op5 = DummyOperator(task_id='upstream_level_3') |
| # order randomly |
| op2.set_downstream(op3) |
| op1.set_downstream(op3) |
| op4.set_downstream(op5) |
| op3.set_downstream(op4) |
| |
| dag.clear() |
| |
| executor = MockExecutor(parallelism=16) |
| job = BackfillJob( |
| dag=dag, |
| executor=executor, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=2), |
| ) |
| job.run() |
| |
| date0 = DEFAULT_DATE |
| date1 = date0 + datetime.timedelta(days=1) |
| date2 = date1 + datetime.timedelta(days=1) |
| |
| # test executor history keeps a list |
| history = executor.history |
| |
| self.assertListEqual( |
| # key[0] is dag id, key[3] is try_number, we don't care about either of those here |
| [sorted([item[-1].key[1:3] for item in batch]) for batch in history], |
| [ |
| [ |
| ('leave1', date0), |
| ('leave1', date1), |
| ('leave1', date2), |
| ('leave2', date0), |
| ('leave2', date1), |
| ('leave2', date2), |
| ], |
| [('upstream_level_1', date0), ('upstream_level_1', date1), ('upstream_level_1', date2)], |
| [('upstream_level_2', date0), ('upstream_level_2', date1), ('upstream_level_2', date2)], |
| [('upstream_level_3', date0), ('upstream_level_3', date1), ('upstream_level_3', date2)], |
| ], |
| ) |
| |
| def test_backfill_pooled_tasks(self): |
| """ |
| Test that queued tasks are executed by BackfillJob |
| """ |
| session = settings.Session() |
| pool = Pool(pool='test_backfill_pooled_task_pool', slots=1) |
| session.add(pool) |
| session.commit() |
| session.close() |
| |
| dag = self.dagbag.get_dag('test_backfill_pooled_task_dag') |
| dag.clear() |
| |
| executor = MockExecutor(do_update=True) |
| job = BackfillJob(dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, executor=executor) |
| |
| # run with timeout because this creates an infinite loop if not |
| # caught |
| try: |
| with timeout(seconds=5): |
| job.run() |
| except AirflowTaskTimeout: |
| pass |
| ti = TI(task=dag.get_task('test_backfill_pooled_task'), execution_date=DEFAULT_DATE) |
| ti.refresh_from_db() |
| self.assertEqual(ti.state, State.SUCCESS) |
| |
| def test_backfill_depends_on_past(self): |
| """ |
| Test that backfill respects ignore_depends_on_past |
| """ |
| dag = self.dagbag.get_dag('test_depends_on_past') |
| dag.clear() |
| run_date = DEFAULT_DATE + datetime.timedelta(days=5) |
| |
| # backfill should deadlock |
| self.assertRaisesRegex( |
| AirflowException, |
| 'BackfillJob is deadlocked', |
| BackfillJob(dag=dag, start_date=run_date, end_date=run_date).run, |
| ) |
| |
| BackfillJob( |
| dag=dag, |
| start_date=run_date, |
| end_date=run_date, |
| executor=MockExecutor(), |
| ignore_first_depends_on_past=True, |
| ).run() |
| |
| # ti should have succeeded |
| ti = TI(dag.tasks[0], run_date) |
| ti.refresh_from_db() |
| self.assertEqual(ti.state, State.SUCCESS) |
| |
| def test_backfill_depends_on_past_backwards(self): |
| """ |
| Test that CLI respects -B argument and raises on interaction with depends_on_past |
| """ |
| dag_id = 'test_depends_on_past' |
| start_date = DEFAULT_DATE + datetime.timedelta(days=1) |
| end_date = start_date + datetime.timedelta(days=1) |
| kwargs = dict( |
| start_date=start_date, |
| end_date=end_date, |
| ) |
| dag = self.dagbag.get_dag(dag_id) |
| dag.clear() |
| |
| executor = MockExecutor() |
| job = BackfillJob(dag=dag, executor=executor, ignore_first_depends_on_past=True, **kwargs) |
| job.run() |
| |
| ti = TI(dag.get_task('test_dop_task'), end_date) |
| ti.refresh_from_db() |
| # runs fine forwards |
| self.assertEqual(ti.state, State.SUCCESS) |
| |
| # raises backwards |
| expected_msg = 'You cannot backfill backwards because one or more tasks depend_on_past: {}'.format( |
| 'test_dop_task' |
| ) |
| with self.assertRaisesRegex(AirflowException, expected_msg): |
| executor = MockExecutor() |
| job = BackfillJob(dag=dag, executor=executor, run_backwards=True, **kwargs) |
| job.run() |
| |
| def test_cli_receives_delay_arg(self): |
| """ |
| Tests that the --delay argument is passed correctly to the BackfillJob |
| """ |
| dag_id = 'example_bash_operator' |
| run_date = DEFAULT_DATE |
| args = [ |
| 'dags', |
| 'backfill', |
| dag_id, |
| '-s', |
| run_date.isoformat(), |
| '--delay-on-limit', |
| '0.5', |
| ] |
| parsed_args = self.parser.parse_args(args) |
| self.assertEqual(0.5, parsed_args.delay_on_limit) |
| |
| def _get_dag_test_max_active_limits(self, dag_id, max_active_runs=1): |
| dag = DAG( |
| dag_id=dag_id, |
| start_date=DEFAULT_DATE, |
| schedule_interval="@hourly", |
| max_active_runs=max_active_runs, |
| ) |
| |
| with dag: |
| op1 = DummyOperator(task_id='leave1') |
| op2 = DummyOperator(task_id='leave2') |
| op3 = DummyOperator(task_id='upstream_level_1') |
| op4 = DummyOperator(task_id='upstream_level_2') |
| |
| op1 >> op2 >> op3 |
| op4 >> op3 |
| |
| dag.clear() |
| return dag |
| |
| def test_backfill_max_limit_check_within_limit(self): |
| dag = self._get_dag_test_max_active_limits( |
| 'test_backfill_max_limit_check_within_limit', max_active_runs=16 |
| ) |
| |
| start_date = DEFAULT_DATE - datetime.timedelta(hours=1) |
| end_date = DEFAULT_DATE |
| |
| executor = MockExecutor() |
| job = BackfillJob( |
| dag=dag, start_date=start_date, end_date=end_date, executor=executor, donot_pickle=True |
| ) |
| job.run() |
| |
| dagruns = DagRun.find(dag_id=dag.dag_id) |
| self.assertEqual(2, len(dagruns)) |
| self.assertTrue(all(run.state == State.SUCCESS for run in dagruns)) |
| |
| def test_backfill_max_limit_check(self): |
| dag_id = 'test_backfill_max_limit_check' |
| run_id = 'test_dagrun' |
| start_date = DEFAULT_DATE - datetime.timedelta(hours=1) |
| end_date = DEFAULT_DATE |
| |
| dag_run_created_cond = threading.Condition() |
| |
| def run_backfill(cond): |
| cond.acquire() |
| # this session object is different than the one in the main thread |
| with create_session() as thread_session: |
| try: |
| dag = self._get_dag_test_max_active_limits(dag_id) |
| |
| # Existing dagrun that is not within the backfill range |
| dag.create_dagrun( |
| run_id=run_id, |
| state=State.RUNNING, |
| execution_date=DEFAULT_DATE + datetime.timedelta(hours=1), |
| start_date=DEFAULT_DATE, |
| ) |
| |
| thread_session.commit() |
| cond.notify() |
| finally: |
| cond.release() |
| thread_session.close() |
| |
| executor = MockExecutor() |
| job = BackfillJob( |
| dag=dag, start_date=start_date, end_date=end_date, executor=executor, donot_pickle=True |
| ) |
| job.run() |
| |
| backfill_job_thread = threading.Thread( |
| target=run_backfill, name="run_backfill", args=(dag_run_created_cond,) |
| ) |
| |
| dag_run_created_cond.acquire() |
| with create_session() as session: |
| backfill_job_thread.start() |
| try: |
| # at this point backfill can't run since the max_active_runs has been |
| # reached, so it is waiting |
| dag_run_created_cond.wait(timeout=1.5) |
| dagruns = DagRun.find(dag_id=dag_id) |
| dr = dagruns[0] |
| self.assertEqual(1, len(dagruns)) |
| self.assertEqual(dr.run_id, run_id) |
| |
| # allow the backfill to execute |
| # by setting the existing dag run to SUCCESS, |
| # backfill will execute dag runs 1 by 1 |
| dr.set_state(State.SUCCESS) |
| session.merge(dr) |
| session.commit() |
| |
| backfill_job_thread.join() |
| |
| dagruns = DagRun.find(dag_id=dag_id) |
| self.assertEqual(3, len(dagruns)) # 2 from backfill + 1 existing |
| self.assertEqual(dagruns[-1].run_id, dr.run_id) |
| finally: |
| dag_run_created_cond.release() |
| |
| def test_backfill_max_limit_check_no_count_existing(self): |
| dag = self._get_dag_test_max_active_limits('test_backfill_max_limit_check_no_count_existing') |
| start_date = DEFAULT_DATE |
| end_date = DEFAULT_DATE |
| |
| # Existing dagrun that is within the backfill range |
| dag.create_dagrun( |
| run_id="test_existing_backfill", |
| state=State.RUNNING, |
| execution_date=DEFAULT_DATE, |
| start_date=DEFAULT_DATE, |
| ) |
| |
| executor = MockExecutor() |
| job = BackfillJob( |
| dag=dag, start_date=start_date, end_date=end_date, executor=executor, donot_pickle=True |
| ) |
| job.run() |
| |
| # BackfillJob will run since the existing DagRun does not count for the max |
| # active limit since it's within the backfill date range. |
| dagruns = DagRun.find(dag_id=dag.dag_id) |
| # will only be able to run 1 (the existing one) since there's just |
| # one dag run slot left given the max_active_runs limit |
| self.assertEqual(1, len(dagruns)) |
| self.assertEqual(State.SUCCESS, dagruns[0].state) |
| |
| def test_backfill_max_limit_check_complete_loop(self): |
| dag = self._get_dag_test_max_active_limits('test_backfill_max_limit_check_complete_loop') |
| start_date = DEFAULT_DATE - datetime.timedelta(hours=1) |
| end_date = DEFAULT_DATE |
| |
| # Given the max limit to be 1 in active dag runs, we need to run the |
| # backfill job 3 times |
| success_expected = 2 |
| executor = MockExecutor() |
| job = BackfillJob( |
| dag=dag, start_date=start_date, end_date=end_date, executor=executor, donot_pickle=True |
| ) |
| job.run() |
| |
| success_dagruns = len(DagRun.find(dag_id=dag.dag_id, state=State.SUCCESS)) |
| running_dagruns = len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING)) |
| self.assertEqual(success_expected, success_dagruns) |
| self.assertEqual(0, running_dagruns) # no dag_runs in running state are left |
| |
| def test_sub_set_subdag(self): |
| dag = DAG('test_sub_set_subdag', start_date=DEFAULT_DATE, default_args={'owner': 'owner1'}) |
| |
| with dag: |
| op1 = DummyOperator(task_id='leave1') |
| op2 = DummyOperator(task_id='leave2') |
| op3 = DummyOperator(task_id='upstream_level_1') |
| op4 = DummyOperator(task_id='upstream_level_2') |
| op5 = DummyOperator(task_id='upstream_level_3') |
| # order randomly |
| op2.set_downstream(op3) |
| op1.set_downstream(op3) |
| op4.set_downstream(op5) |
| op3.set_downstream(op4) |
| |
| dag.clear() |
| dr = dag.create_dagrun( |
| run_id="test", state=State.RUNNING, execution_date=DEFAULT_DATE, start_date=DEFAULT_DATE |
| ) |
| |
| executor = MockExecutor() |
| sub_dag = dag.sub_dag(task_ids_or_regex="leave*", include_downstream=False, include_upstream=False) |
| job = BackfillJob(dag=sub_dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, executor=executor) |
| job.run() |
| |
| self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db) |
| # the run_id should have changed, so a refresh won't work |
| drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE) |
| dr = drs[0] |
| |
| self.assertEqual(DagRun.generate_run_id(DagRunType.BACKFILL_JOB, DEFAULT_DATE), dr.run_id) |
| for ti in dr.get_task_instances(): |
| if ti.task_id == 'leave1' or ti.task_id == 'leave2': |
| self.assertEqual(State.SUCCESS, ti.state) |
| else: |
| self.assertEqual(State.NONE, ti.state) |
| |
| def test_backfill_fill_blanks(self): |
| dag = DAG( |
| 'test_backfill_fill_blanks', |
| start_date=DEFAULT_DATE, |
| default_args={'owner': 'owner1'}, |
| ) |
| |
| with dag: |
| op1 = DummyOperator(task_id='op1') |
| op2 = DummyOperator(task_id='op2') |
| op3 = DummyOperator(task_id='op3') |
| op4 = DummyOperator(task_id='op4') |
| op5 = DummyOperator(task_id='op5') |
| op6 = DummyOperator(task_id='op6') |
| |
| dag.clear() |
| dr = dag.create_dagrun( |
| run_id='test', state=State.RUNNING, execution_date=DEFAULT_DATE, start_date=DEFAULT_DATE |
| ) |
| executor = MockExecutor() |
| |
| session = settings.Session() |
| |
| tis = dr.get_task_instances() |
| for ti in tis: |
| if ti.task_id == op1.task_id: |
| ti.state = State.UP_FOR_RETRY |
| ti.end_date = DEFAULT_DATE |
| elif ti.task_id == op2.task_id: |
| ti.state = State.FAILED |
| elif ti.task_id == op3.task_id: |
| ti.state = State.SKIPPED |
| elif ti.task_id == op4.task_id: |
| ti.state = State.SCHEDULED |
| elif ti.task_id == op5.task_id: |
| ti.state = State.UPSTREAM_FAILED |
| # op6 = None |
| session.merge(ti) |
| session.commit() |
| session.close() |
| |
| job = BackfillJob(dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, executor=executor) |
| self.assertRaisesRegex(AirflowException, 'Some task instances failed', job.run) |
| |
| self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db) |
| # the run_id should have changed, so a refresh won't work |
| drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE) |
| dr = drs[0] |
| |
| self.assertEqual(dr.state, State.FAILED) |
| |
| tis = dr.get_task_instances() |
| for ti in tis: |
| if ti.task_id in (op1.task_id, op4.task_id, op6.task_id): |
| self.assertEqual(ti.state, State.SUCCESS) |
| elif ti.task_id == op2.task_id: |
| self.assertEqual(ti.state, State.FAILED) |
| elif ti.task_id == op3.task_id: |
| self.assertEqual(ti.state, State.SKIPPED) |
| elif ti.task_id == op5.task_id: |
| self.assertEqual(ti.state, State.UPSTREAM_FAILED) |
| |
| def test_backfill_execute_subdag(self): |
| dag = self.dagbag.get_dag('example_subdag_operator') |
| subdag_op_task = dag.get_task('section-1') |
| |
| subdag = subdag_op_task.subdag |
| subdag.schedule_interval = '@daily' |
| |
| start_date = timezone.utcnow() |
| executor = MockExecutor() |
| job = BackfillJob( |
| dag=subdag, start_date=start_date, end_date=start_date, executor=executor, donot_pickle=True |
| ) |
| job.run() |
| |
| subdag_op_task.pre_execute(context={'execution_date': start_date}) |
| subdag_op_task.execute(context={'execution_date': start_date}) |
| subdag_op_task.post_execute(context={'execution_date': start_date}) |
| |
| history = executor.history |
| subdag_history = history[0] |
| |
| # check that all 5 task instances of the subdag 'section-1' were executed |
| self.assertEqual(5, len(subdag_history)) |
| for sdh in subdag_history: |
| ti = sdh[3] |
| self.assertIn('section-1-task-', ti.task_id) |
| |
| with create_session() as session: |
| successful_subdag_runs = ( |
| session.query(DagRun) |
| .filter(DagRun.dag_id == subdag.dag_id) |
| .filter(DagRun.execution_date == start_date) |
| # pylint: disable=comparison-with-callable |
| .filter(DagRun.state == State.SUCCESS) |
| .count() |
| ) |
| |
| self.assertEqual(1, successful_subdag_runs) |
| |
| subdag.clear() |
| dag.clear() |
| |
| def test_subdag_clear_parentdag_downstream_clear(self): |
| dag = self.dagbag.get_dag('clear_subdag_test_dag') |
| subdag_op_task = dag.get_task('daily_job') |
| |
| subdag = subdag_op_task.subdag |
| |
| executor = MockExecutor() |
| job = BackfillJob( |
| dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, executor=executor, donot_pickle=True |
| ) |
| |
| with timeout(seconds=30): |
| job.run() |
| |
| ti_subdag = TI(task=dag.get_task('daily_job'), execution_date=DEFAULT_DATE) |
| ti_subdag.refresh_from_db() |
| self.assertEqual(ti_subdag.state, State.SUCCESS) |
| |
| ti_irrelevant = TI(task=dag.get_task('daily_job_irrelevant'), execution_date=DEFAULT_DATE) |
| ti_irrelevant.refresh_from_db() |
| self.assertEqual(ti_irrelevant.state, State.SUCCESS) |
| |
| ti_downstream = TI(task=dag.get_task('daily_job_downstream'), execution_date=DEFAULT_DATE) |
| ti_downstream.refresh_from_db() |
| self.assertEqual(ti_downstream.state, State.SUCCESS) |
| |
| sdag = subdag.sub_dag( |
| task_ids_or_regex='daily_job_subdag_task', include_downstream=True, include_upstream=False |
| ) |
| |
| sdag.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, include_parentdag=True) |
| |
| ti_subdag.refresh_from_db() |
| self.assertEqual(State.NONE, ti_subdag.state) |
| |
| ti_irrelevant.refresh_from_db() |
| self.assertEqual(State.SUCCESS, ti_irrelevant.state) |
| |
| ti_downstream.refresh_from_db() |
| self.assertEqual(State.NONE, ti_downstream.state) |
| |
| subdag.clear() |
| dag.clear() |
| |
| def test_backfill_execute_subdag_with_removed_task(self): |
| """ |
| Ensure that subdag operators execute properly in the case where |
| an associated task of the subdag has been removed from the dag |
| definition, but has instances in the database from previous runs. |
| """ |
| dag = self.dagbag.get_dag('example_subdag_operator') |
| subdag = dag.get_task('section-1').subdag |
| |
| executor = MockExecutor() |
| job = BackfillJob( |
| dag=subdag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, executor=executor, donot_pickle=True |
| ) |
| |
| removed_task_ti = TI( |
| task=DummyOperator(task_id='removed_task'), execution_date=DEFAULT_DATE, state=State.REMOVED |
| ) |
| removed_task_ti.dag_id = subdag.dag_id |
| |
| session = settings.Session() |
| session.merge(removed_task_ti) |
| session.commit() |
| |
| with timeout(seconds=30): |
| job.run() |
| |
| for task in subdag.tasks: |
| instance = ( |
| session.query(TI) |
| .filter( |
| TI.dag_id == subdag.dag_id, TI.task_id == task.task_id, TI.execution_date == DEFAULT_DATE |
| ) |
| .first() |
| ) |
| |
| self.assertIsNotNone(instance) |
| self.assertEqual(instance.state, State.SUCCESS) |
| |
| removed_task_ti.refresh_from_db() |
| self.assertEqual(removed_task_ti.state, State.REMOVED) |
| |
| subdag.clear() |
| dag.clear() |
| |
| def test_update_counters(self): |
| dag = DAG(dag_id='test_manage_executor_state', start_date=DEFAULT_DATE) |
| |
| task1 = DummyOperator(task_id='dummy', dag=dag, owner='airflow') |
| |
| job = BackfillJob(dag=dag) |
| |
| session = settings.Session() |
| dr = dag.create_dagrun( |
| run_type=DagRunType.SCHEDULED, |
| state=State.RUNNING, |
| execution_date=DEFAULT_DATE, |
| start_date=DEFAULT_DATE, |
| session=session, |
| ) |
| ti = TI(task1, dr.execution_date) |
| ti.refresh_from_db() |
| |
| ti_status = BackfillJob._DagRunTaskStatus() |
| |
| # test for success |
| ti.set_state(State.SUCCESS, session) |
| ti_status.running[ti.key] = ti |
| job._update_counters(ti_status=ti_status) |
| self.assertTrue(len(ti_status.running) == 0) |
| self.assertTrue(len(ti_status.succeeded) == 1) |
| self.assertTrue(len(ti_status.skipped) == 0) |
| self.assertTrue(len(ti_status.failed) == 0) |
| self.assertTrue(len(ti_status.to_run) == 0) |
| |
| ti_status.succeeded.clear() |
| |
| # test for skipped |
| ti.set_state(State.SKIPPED, session) |
| ti_status.running[ti.key] = ti |
| job._update_counters(ti_status=ti_status) |
| self.assertTrue(len(ti_status.running) == 0) |
| self.assertTrue(len(ti_status.succeeded) == 0) |
| self.assertTrue(len(ti_status.skipped) == 1) |
| self.assertTrue(len(ti_status.failed) == 0) |
| self.assertTrue(len(ti_status.to_run) == 0) |
| |
| ti_status.skipped.clear() |
| |
| # test for failed |
| ti.set_state(State.FAILED, session) |
| ti_status.running[ti.key] = ti |
| job._update_counters(ti_status=ti_status) |
| self.assertTrue(len(ti_status.running) == 0) |
| self.assertTrue(len(ti_status.succeeded) == 0) |
| self.assertTrue(len(ti_status.skipped) == 0) |
| self.assertTrue(len(ti_status.failed) == 1) |
| self.assertTrue(len(ti_status.to_run) == 0) |
| |
| ti_status.failed.clear() |
| |
| # test for retry |
| ti.set_state(State.UP_FOR_RETRY, session) |
| ti_status.running[ti.key] = ti |
| job._update_counters(ti_status=ti_status) |
| self.assertTrue(len(ti_status.running) == 0) |
| self.assertTrue(len(ti_status.succeeded) == 0) |
| self.assertTrue(len(ti_status.skipped) == 0) |
| self.assertTrue(len(ti_status.failed) == 0) |
| self.assertTrue(len(ti_status.to_run) == 1) |
| |
| ti_status.to_run.clear() |
| |
| # test for reschedule |
| ti.set_state(State.UP_FOR_RESCHEDULE, session) |
| ti_status.running[ti.key] = ti |
| job._update_counters(ti_status=ti_status) |
| self.assertTrue(len(ti_status.running) == 0) |
| self.assertTrue(len(ti_status.succeeded) == 0) |
| self.assertTrue(len(ti_status.skipped) == 0) |
| self.assertTrue(len(ti_status.failed) == 0) |
| self.assertTrue(len(ti_status.to_run) == 1) |
| |
| ti_status.to_run.clear() |
| |
| # test for none |
| ti.set_state(State.NONE, session) |
| ti_status.running[ti.key] = ti |
| job._update_counters(ti_status=ti_status) |
| self.assertTrue(len(ti_status.running) == 0) |
| self.assertTrue(len(ti_status.succeeded) == 0) |
| self.assertTrue(len(ti_status.skipped) == 0) |
| self.assertTrue(len(ti_status.failed) == 0) |
| self.assertTrue(len(ti_status.to_run) == 1) |
| |
| ti_status.to_run.clear() |
| |
| session.close() |
| |
| def test_dag_get_run_dates(self): |
| def get_test_dag_for_backfill(schedule_interval=None): |
| dag = DAG(dag_id='test_get_dates', start_date=DEFAULT_DATE, schedule_interval=schedule_interval) |
| DummyOperator( |
| task_id='dummy', |
| dag=dag, |
| owner='airflow', |
| ) |
| return dag |
| |
| test_dag = get_test_dag_for_backfill() |
| self.assertEqual( |
| [DEFAULT_DATE], test_dag.get_run_dates(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE) |
| ) |
| |
| test_dag = get_test_dag_for_backfill(schedule_interval="@hourly") |
| self.assertEqual( |
| [ |
| DEFAULT_DATE - datetime.timedelta(hours=3), |
| DEFAULT_DATE - datetime.timedelta(hours=2), |
| DEFAULT_DATE - datetime.timedelta(hours=1), |
| DEFAULT_DATE, |
| ], |
| test_dag.get_run_dates( |
| start_date=DEFAULT_DATE - datetime.timedelta(hours=3), |
| end_date=DEFAULT_DATE, |
| ), |
| ) |
| |
| def test_backfill_run_backwards(self): |
| dag = self.dagbag.get_dag("test_start_date_scheduling") |
| dag.clear() |
| |
| executor = MockExecutor(parallelism=16) |
| |
| job = BackfillJob( |
| executor=executor, |
| dag=dag, |
| start_date=DEFAULT_DATE, |
| end_date=DEFAULT_DATE + datetime.timedelta(days=1), |
| run_backwards=True, |
| ) |
| job.run() |
| |
| session = settings.Session() |
| tis = ( |
| session.query(TI) |
| .filter(TI.dag_id == 'test_start_date_scheduling' and TI.task_id == 'dummy') |
| .order_by(TI.execution_date) |
| .all() |
| ) |
| |
| queued_times = [ti.queued_dttm for ti in tis] |
| self.assertTrue(queued_times == sorted(queued_times, reverse=True)) |
| self.assertTrue(all(ti.state == State.SUCCESS for ti in tis)) |
| |
| dag.clear() |
| session.close() |
| |
| def test_reset_orphaned_tasks_with_orphans(self): |
| """Create dagruns and ensure only ones with correct states are reset.""" |
| prefix = 'backfill_job_test_test_reset_orphaned_tasks' |
| states = [State.QUEUED, State.SCHEDULED, State.NONE, State.RUNNING, State.SUCCESS] |
| states_to_reset = [State.QUEUED, State.SCHEDULED, State.NONE] |
| |
| dag = DAG(dag_id=prefix, start_date=DEFAULT_DATE, schedule_interval="@daily") |
| tasks = [] |
| for i in range(len(states)): |
| task_id = f"{prefix}_task_{i}" |
| task = DummyOperator(task_id=task_id, dag=dag) |
| tasks.append(task) |
| |
| session = settings.Session() |
| job = BackfillJob(dag=dag) |
| |
| # create dagruns |
| dr1 = dag.create_dagrun(run_id='test1', state=State.RUNNING) |
| dr2 = dag.create_dagrun(run_id='test2', state=State.SUCCESS) |
| |
| # create taskinstances and set states |
| dr1_tis = [] |
| dr2_tis = [] |
| for i, (task, state) in enumerate(zip(tasks, states)): |
| ti1 = TI(task, dr1.execution_date) |
| ti2 = TI(task, dr2.execution_date) |
| ti1.refresh_from_db() |
| ti2.refresh_from_db() |
| ti1.state = state |
| ti2.state = state |
| dr1_tis.append(ti1) |
| dr2_tis.append(ti2) |
| session.merge(ti1) |
| session.merge(ti2) |
| session.commit() |
| |
| self.assertEqual(2, job.reset_state_for_orphaned_tasks()) |
| |
| for ti in dr1_tis + dr2_tis: |
| ti.refresh_from_db() |
| |
| # running dagrun should be reset |
| for state, ti in zip(states, dr1_tis): |
| if state in states_to_reset: |
| self.assertIsNone(ti.state) |
| else: |
| self.assertEqual(state, ti.state) |
| |
| # otherwise not |
| for state, ti in zip(states, dr2_tis): |
| self.assertEqual(state, ti.state) |
| |
| for state, ti in zip(states, dr1_tis): |
| ti.state = state |
| session.commit() |
| |
| job.reset_state_for_orphaned_tasks(filter_by_dag_run=dr1, session=session) |
| |
| # check same for dag_run version |
| for state, ti in zip(states, dr2_tis): |
| self.assertEqual(state, ti.state) |
| |
| def test_reset_orphaned_tasks_specified_dagrun(self): |
| """Try to reset when we specify a dagrun and ensure nothing else is.""" |
| dag_id = 'test_reset_orphaned_tasks_specified_dagrun' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') |
| task_id = dag_id + '_task' |
| DummyOperator(task_id=task_id, dag=dag) |
| |
| job = BackfillJob(dag=dag) |
| session = settings.Session() |
| # make two dagruns, only reset for one |
| dr1 = dag.create_dagrun(run_id='test1', state=State.SUCCESS) |
| dr2 = dag.create_dagrun(run_id='test2', state=State.RUNNING) |
| ti1 = dr1.get_task_instances(session=session)[0] |
| ti2 = dr2.get_task_instances(session=session)[0] |
| ti1.state = State.SCHEDULED |
| ti2.state = State.SCHEDULED |
| |
| session.merge(ti1) |
| session.merge(ti2) |
| session.merge(dr1) |
| session.merge(dr2) |
| session.commit() |
| |
| num_reset_tis = job.reset_state_for_orphaned_tasks(filter_by_dag_run=dr2, session=session) |
| self.assertEqual(1, num_reset_tis) |
| ti1.refresh_from_db(session=session) |
| ti2.refresh_from_db(session=session) |
| self.assertEqual(State.SCHEDULED, ti1.state) |
| self.assertEqual(State.NONE, ti2.state) |
| |
| def test_job_id_is_assigned_to_dag_run(self): |
| dag_id = 'test_job_id_is_assigned_to_dag_run' |
| dag = DAG(dag_id=dag_id, start_date=DEFAULT_DATE, schedule_interval='@daily') |
| DummyOperator(task_id="dummy_task", dag=dag) |
| |
| job = BackfillJob( |
| dag=dag, executor=MockExecutor(), start_date=datetime.datetime.now() - datetime.timedelta(days=1) |
| ) |
| job.run() |
| dr: DagRun = dag.get_last_dagrun() |
| assert dr.creating_job_id == job.id |