blob: 108ee9700383c1f161bceea6e174b9b04f90c27a [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import datetime
import json
import logging
import threading
from collections import defaultdict
from unittest import mock
from unittest.mock import patch
import pytest
from airflow import settings
from airflow.cli import cli_parser
from airflow.exceptions import (
AirflowException,
AirflowTaskTimeout,
BackfillUnfinished,
DagConcurrencyLimitReached,
NoAvailablePoolSlot,
TaskConcurrencyLimitReached,
)
from airflow.executors.executor_constants import MOCK_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.backfill_job_runner import BackfillJobRunner
from airflow.jobs.job import Job, run_job
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagBag, Pool, TaskInstance as TI
from airflow.models.dagrun import DagRun
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.models.taskmap import TaskMap
from airflow.operators.empty import EmptyOperator
from airflow.utils import timezone
from airflow.utils.session import create_session
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.timeout import timeout
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import DagRunType
from tests.listeners import dag_listener
from tests.models import TEST_DAGS_FOLDER
from tests.test_utils.db import (
clear_db_dags,
clear_db_pools,
clear_db_runs,
clear_db_xcom,
set_default_pool_slots,
)
from tests.test_utils.mock_executor import MockExecutor
from tests.test_utils.timetables import cron_timetable
pytestmark = pytest.mark.db_test
logger = logging.getLogger(__name__)
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
@pytest.fixture(scope="module")
def dag_bag():
return DagBag(include_examples=True)
# Patch the MockExecutor into the dict of known executors in the Loader
@patch.dict(
ExecutorLoader.executors, {MOCK_EXECUTOR: f"{MockExecutor.__module__}.{MockExecutor.__qualname__}"}
)
class TestBackfillJob:
@staticmethod
def clean_db():
clear_db_dags()
clear_db_runs()
clear_db_xcom()
clear_db_pools()
@pytest.fixture(autouse=True)
def set_instance_attrs(self, dag_bag):
self.clean_db()
self.parser = cli_parser.get_parser()
self.dagbag = dag_bag
# `airflow tasks run` relies on serialized_dag
for dag in self.dagbag.dags.values():
SerializedDagModel.write_dag(dag)
def _get_dummy_dag(
self,
dag_maker_fixture,
dag_id="test_dag",
pool=Pool.DEFAULT_POOL_NAME,
max_active_tis_per_dag=None,
task_id="op",
**kwargs,
):
with dag_maker_fixture(dag_id=dag_id, schedule="@daily", **kwargs) as dag:
EmptyOperator(task_id=task_id, pool=pool, max_active_tis_per_dag=max_active_tis_per_dag)
return dag
def _times_called_with(self, method, class_):
count = 0
for args in method.call_args_list:
if isinstance(args[0][0], class_):
count += 1
return count
def test_unfinished_dag_runs_set_to_failed(self, dag_maker):
dag = self._get_dummy_dag(dag_maker)
dag_run = dag_maker.create_dagrun(state=None)
job = Job(executor=MockExecutor())
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=8),
ignore_first_depends_on_past=True,
)
job_runner._set_unfinished_dag_runs_to_failed([dag_run])
dag_run.refresh_from_db()
assert State.FAILED == dag_run.state
def test_dag_run_with_finished_tasks_set_to_success(self, dag_maker):
dag = self._get_dummy_dag(dag_maker)
dag_run = dag_maker.create_dagrun(state=None)
for ti in dag_run.get_task_instances():
ti.set_state(State.SUCCESS)
job = Job(executor=MockExecutor())
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=8),
ignore_first_depends_on_past=True,
)
job_runner._set_unfinished_dag_runs_to_failed([dag_run])
dag_run.refresh_from_db()
assert State.SUCCESS == dag_run.state
@pytest.mark.backend("postgres", "mysql")
def test_trigger_controller_dag(self, session):
dag = self.dagbag.get_dag("example_trigger_controller_dag")
target_dag = self.dagbag.get_dag("example_trigger_target_dag")
target_dag.sync_to_db()
target_dag_run = session.query(DagRun).filter(DagRun.dag_id == target_dag.dag_id).one_or_none()
assert target_dag_run is None
job = Job(executor=MockExecutor())
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
ignore_first_depends_on_past=True,
)
run_job(job=job, execute_callable=job_runner._execute)
dag_run = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).one_or_none()
assert dag_run is not None
task_instances_list = job_runner._task_instances_for_dag_run(dag=dag, dag_run=dag_run)
assert task_instances_list
@pytest.mark.backend("postgres", "mysql")
def test_backfill_multi_dates(self):
dag = self.dagbag.get_dag("miscellaneous_test_dag")
end_date = DEFAULT_DATE + datetime.timedelta(days=1)
executor = MockExecutor(parallelism=16)
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=end_date,
ignore_first_depends_on_past=True,
)
run_job(job=job, execute_callable=job_runner._execute)
expected_execution_order = [
("runme_0", DEFAULT_DATE),
("runme_1", DEFAULT_DATE),
("runme_2", DEFAULT_DATE),
("runme_0", end_date),
("runme_1", end_date),
("runme_2", end_date),
("also_run_this", DEFAULT_DATE),
("also_run_this", end_date),
("run_after_loop", DEFAULT_DATE),
("run_after_loop", end_date),
("run_this_last", DEFAULT_DATE),
("run_this_last", end_date),
]
assert [
((dag.dag_id, task_id, f"backfill__{when.isoformat()}", 1, -1), (State.SUCCESS, None))
for (task_id, when) in expected_execution_order
] == executor.sorted_tasks
session = settings.Session()
drs = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).order_by(DagRun.execution_date).all()
assert drs[0].execution_date == DEFAULT_DATE
assert drs[0].state == State.SUCCESS
assert drs[1].execution_date == DEFAULT_DATE + datetime.timedelta(days=1)
assert drs[1].state == State.SUCCESS
dag.clear()
session.close()
@pytest.mark.backend("postgres", "mysql")
@pytest.mark.parametrize(
"dag_id, expected_execution_order",
[
[
"example_branch_operator",
(
"run_this_first",
"branching",
"branch_a",
"branch_b",
"branch_c",
"branch_d",
"follow_a",
"follow_b",
"follow_c",
"follow_d",
"join",
"branching_ext_python",
"ext_py_a",
"ext_py_b",
"ext_py_c",
"ext_py_d",
"join_ext_python",
"branching_venv",
"venv_a",
"venv_b",
"venv_c",
"venv_d",
"join_venv",
),
],
[
"miscellaneous_test_dag",
("runme_0", "runme_1", "runme_2", "also_run_this", "run_after_loop", "run_this_last"),
],
[
"example_skip_dag",
(
"always_true_1",
"always_true_2",
"skip_operator_1",
"skip_operator_2",
"all_success",
"one_success",
"final_1",
"final_2",
),
],
["latest_only", ("latest_only", "task1")],
],
)
def test_backfill_examples(self, dag_id, expected_execution_order):
"""
Test backfilling example dags
Try to backfill some of the example dags. Be careful, not all dags are suitable
for doing this. For example, a dag that sleeps forever, or does not have a
schedule won't work here since you simply can't backfill them.
"""
dag = self.dagbag.get_dag(dag_id)
logger.info("*** Running example DAG: %s", dag.dag_id)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
ignore_first_depends_on_past=True,
)
run_job(job=job, execute_callable=job_runner._execute)
assert [
((dag_id, task_id, f"backfill__{DEFAULT_DATE.isoformat()}", 1, -1), (State.SUCCESS, None))
for task_id in expected_execution_order
] == executor.sorted_tasks
def test_backfill_conf(self, dag_maker):
dag = self._get_dummy_dag(dag_maker, dag_id="test_backfill_conf")
dag_maker.create_dagrun(state=None)
executor = MockExecutor()
conf_ = json.loads("""{"key": "value"}""")
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
conf=conf_,
)
run_job(job=job, execute_callable=job_runner._execute)
# We ignore the first dag_run created by fixture
dr = DagRun.find(
dag_id="test_backfill_conf", execution_start_date=DEFAULT_DATE + datetime.timedelta(days=1)
)
assert conf_ == dr[0].conf
@patch("airflow.jobs.backfill_job_runner.BackfillJobRunner.log")
def test_backfill_respect_max_active_tis_per_dag_limit(self, mock_log, dag_maker):
max_active_tis_per_dag = 2
dag = self._get_dummy_dag(
dag_maker,
dag_id="test_backfill_respect_max_active_tis_per_dag_limit",
max_active_tis_per_dag=max_active_tis_per_dag,
)
dag_maker.create_dagrun(state=None)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=7),
)
run_job(job=job, execute_callable=job_runner._execute)
assert len(executor.history) > 0
task_concurrency_limit_reached_at_least_once = False
num_running_task_instances = 0
for running_task_instances in executor.history:
assert len(running_task_instances) <= max_active_tis_per_dag
num_running_task_instances += len(running_task_instances)
if len(running_task_instances) == max_active_tis_per_dag:
task_concurrency_limit_reached_at_least_once = True
assert 8 == num_running_task_instances
assert task_concurrency_limit_reached_at_least_once
times_dag_concurrency_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
DagConcurrencyLimitReached,
)
times_pool_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
NoAvailablePoolSlot,
)
times_task_concurrency_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
TaskConcurrencyLimitReached,
)
assert 0 == times_pool_limit_reached_in_debug
assert 0 == times_dag_concurrency_limit_reached_in_debug
assert times_task_concurrency_limit_reached_in_debug > 0
@pytest.mark.parametrize("with_max_active_tis_per_dag", [False, True])
@patch("airflow.jobs.backfill_job_runner.BackfillJobRunner.log")
def test_backfill_respect_max_active_tis_per_dagrun_limit(
self, mock_log, dag_maker, with_max_active_tis_per_dag
):
max_active_tis_per_dag = 3
max_active_tis_per_dagrun = 2
kwargs = {"max_active_tis_per_dagrun": max_active_tis_per_dagrun}
if with_max_active_tis_per_dag:
kwargs["max_active_tis_per_dag"] = max_active_tis_per_dag
with dag_maker(dag_id="test_backfill_respect_max_active_tis_per_dag_limit", schedule="@daily") as dag:
EmptyOperator.partial(task_id="task1", **kwargs).expand_kwargs([{"x": i} for i in range(10)])
dag_maker.create_dagrun(state=None)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=7),
)
run_job(job=job, execute_callable=job_runner._execute)
assert len(executor.history) > 0
task_concurrency_limit_reached_at_least_once = False
def get_running_tis_per_dagrun(running_tis):
running_tis_per_dagrun_dict = defaultdict(int)
for running_ti in running_tis:
running_tis_per_dagrun_dict[running_ti[3].dag_run.id] += 1
return running_tis_per_dagrun_dict
num_running_task_instances = 0
for running_task_instances in executor.history:
if with_max_active_tis_per_dag:
assert len(running_task_instances) <= max_active_tis_per_dag
running_tis_per_dagrun_dict = get_running_tis_per_dagrun(running_task_instances)
assert all(
[
num_running_tis <= max_active_tis_per_dagrun
for num_running_tis in running_tis_per_dagrun_dict.values()
]
)
num_running_task_instances += len(running_task_instances)
task_concurrency_limit_reached_at_least_once = (
task_concurrency_limit_reached_at_least_once
or any(
[
num_running_tis == max_active_tis_per_dagrun
for num_running_tis in running_tis_per_dagrun_dict.values()
]
)
)
assert 80 == num_running_task_instances # (7 backfill run + 1 manual run ) * 10 mapped task per run
assert task_concurrency_limit_reached_at_least_once
times_dag_concurrency_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
DagConcurrencyLimitReached,
)
times_pool_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
NoAvailablePoolSlot,
)
times_task_concurrency_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
TaskConcurrencyLimitReached,
)
assert 0 == times_pool_limit_reached_in_debug
assert 0 == times_dag_concurrency_limit_reached_in_debug
assert times_task_concurrency_limit_reached_in_debug > 0
@patch("airflow.jobs.backfill_job_runner.BackfillJobRunner.log")
def test_backfill_respect_dag_concurrency_limit(self, mock_log, dag_maker):
dag = self._get_dummy_dag(dag_maker, dag_id="test_backfill_respect_concurrency_limit")
dag_maker.create_dagrun(state=None)
dag.max_active_tasks = 2
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=7),
)
run_job(job=job, execute_callable=job_runner._execute)
assert len(executor.history) > 0
concurrency_limit_reached_at_least_once = False
num_running_task_instances = 0
for running_task_instances in executor.history:
assert len(running_task_instances) <= dag.max_active_tasks
num_running_task_instances += len(running_task_instances)
if len(running_task_instances) == dag.max_active_tasks:
concurrency_limit_reached_at_least_once = True
assert 8 == num_running_task_instances
assert concurrency_limit_reached_at_least_once
times_dag_concurrency_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
DagConcurrencyLimitReached,
)
times_pool_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
NoAvailablePoolSlot,
)
times_task_concurrency_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
TaskConcurrencyLimitReached,
)
assert 0 == times_pool_limit_reached_in_debug
assert 0 == times_task_concurrency_limit_reached_in_debug
assert times_dag_concurrency_limit_reached_in_debug > 0
@patch("airflow.jobs.backfill_job_runner.BackfillJobRunner.log")
def test_backfill_respect_default_pool_limit(self, mock_log, dag_maker):
default_pool_slots = 2
set_default_pool_slots(default_pool_slots)
dag = self._get_dummy_dag(dag_maker, dag_id="test_backfill_with_no_pool_limit")
dag_maker.create_dagrun(state=None)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=7),
)
run_job(job=job, execute_callable=job_runner._execute)
assert len(executor.history) > 0
default_pool_task_slot_count_reached_at_least_once = False
num_running_task_instances = 0
# if no pool is specified, the number of tasks running in
# parallel per backfill should be less than
# default_pool slots at any point of time.
for running_task_instances in executor.history:
assert len(running_task_instances) <= default_pool_slots
num_running_task_instances += len(running_task_instances)
if len(running_task_instances) == default_pool_slots:
default_pool_task_slot_count_reached_at_least_once = True
assert 8 == num_running_task_instances
assert default_pool_task_slot_count_reached_at_least_once
times_dag_concurrency_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
DagConcurrencyLimitReached,
)
times_pool_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
NoAvailablePoolSlot,
)
times_task_concurrency_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
TaskConcurrencyLimitReached,
)
assert 0 == times_dag_concurrency_limit_reached_in_debug
assert 0 == times_task_concurrency_limit_reached_in_debug
assert times_pool_limit_reached_in_debug > 0
def test_backfill_pool_not_found(self, dag_maker):
dag = self._get_dummy_dag(
dag_maker,
dag_id="test_backfill_pool_not_found",
pool="king_pool",
)
dag_maker.create_dagrun(state=None)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=7),
)
try:
run_job(job=job, execute_callable=job_runner._execute)
except AirflowException:
return
@patch("airflow.jobs.backfill_job_runner.BackfillJobRunner.log")
def test_backfill_respect_pool_limit(self, mock_log, dag_maker):
session = settings.Session()
slots = 2
pool = Pool(
pool="pool_with_two_slots",
slots=slots,
include_deferred=False,
)
session.add(pool)
session.commit()
dag = self._get_dummy_dag(
dag_maker,
dag_id="test_backfill_respect_pool_limit",
pool=pool.pool,
)
dag_maker.create_dagrun(state=None)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=7),
)
run_job(job=job, execute_callable=job_runner._execute)
assert len(executor.history) > 0
pool_was_full_at_least_once = False
num_running_task_instances = 0
for running_task_instances in executor.history:
assert len(running_task_instances) <= slots
num_running_task_instances += len(running_task_instances)
if len(running_task_instances) == slots:
pool_was_full_at_least_once = True
assert 8 == num_running_task_instances
assert pool_was_full_at_least_once
times_dag_concurrency_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
DagConcurrencyLimitReached,
)
times_pool_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
NoAvailablePoolSlot,
)
times_task_concurrency_limit_reached_in_debug = self._times_called_with(
mock_log.debug,
TaskConcurrencyLimitReached,
)
assert 0 == times_task_concurrency_limit_reached_in_debug
assert 0 == times_dag_concurrency_limit_reached_in_debug
assert times_pool_limit_reached_in_debug > 0
def test_backfill_run_rescheduled(self, dag_maker):
dag = self._get_dummy_dag(
dag_maker, dag_id="test_backfill_run_rescheduled", task_id="test_backfill_run_rescheduled_task-1"
)
dag_maker.create_dagrun(state=None)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
)
run_job(job=job, execute_callable=job_runner._execute)
ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"), execution_date=DEFAULT_DATE)
ti.refresh_from_db()
ti.set_state(State.UP_FOR_RESCHEDULE)
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
rerun_failed_tasks=True,
)
run_job(job=job, execute_callable=job_runner._execute)
ti = TI(task=dag.get_task("test_backfill_run_rescheduled_task-1"), execution_date=DEFAULT_DATE)
ti.refresh_from_db()
assert ti.state == State.SUCCESS
def test_backfill_override_conf(self, dag_maker):
dag = self._get_dummy_dag(
dag_maker, dag_id="test_backfill_override_conf", task_id="test_backfill_override_conf-1"
)
dr = dag_maker.create_dagrun(
state=None,
start_date=DEFAULT_DATE,
)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
conf={"a": 1},
)
with patch.object(
job_runner,
"_task_instances_for_dag_run",
wraps=job_runner._task_instances_for_dag_run,
) as wrapped_task_instances_for_dag_run:
run_job(job=job, execute_callable=job_runner._execute)
dr = wrapped_task_instances_for_dag_run.call_args_list[0][0][1]
assert dr.conf == {"a": 1}
def test_backfill_skip_active_scheduled_dagrun(self, dag_maker, caplog):
dag = self._get_dummy_dag(
dag_maker,
dag_id="test_backfill_skip_active_scheduled_dagrun",
task_id="test_backfill_skip_active_scheduled_dagrun-1",
)
dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
)
with caplog.at_level(logging.ERROR, logger="airflow.jobs.backfill_job_runner.BackfillJob"):
caplog.clear()
run_job(job=job, execute_callable=job_runner._execute)
assert "Backfill cannot be created for DagRun" in caplog.messages[0]
ti = TI(
task=dag.get_task("test_backfill_skip_active_scheduled_dagrun-1"), execution_date=DEFAULT_DATE
)
ti.refresh_from_db()
# since DAG backfill is skipped, task state should be none
assert ti.state == State.NONE
def test_backfill_rerun_failed_tasks(self, dag_maker):
dag = self._get_dummy_dag(
dag_maker, dag_id="test_backfill_rerun_failed", task_id="test_backfill_rerun_failed_task-1"
)
dag_maker.create_dagrun(state=None)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
)
run_job(job=job, execute_callable=job_runner._execute)
ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), execution_date=DEFAULT_DATE)
ti.refresh_from_db()
ti.set_state(State.FAILED)
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
rerun_failed_tasks=True,
)
run_job(job=job, execute_callable=job_runner._execute)
ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), execution_date=DEFAULT_DATE)
ti.refresh_from_db()
assert ti.state == State.SUCCESS
def test_backfill_rerun_upstream_failed_tasks(self, dag_maker):
with dag_maker(dag_id="test_backfill_rerun_upstream_failed", schedule="@daily") as dag:
op1 = EmptyOperator(task_id="test_backfill_rerun_upstream_failed_task-1")
op2 = EmptyOperator(task_id="test_backfill_rerun_upstream_failed_task-2")
op1.set_upstream(op2)
dag_maker.create_dagrun(state=None)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
)
run_job(job=job, execute_callable=job_runner._execute)
ti = TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"), execution_date=DEFAULT_DATE)
ti.refresh_from_db()
ti.set_state(State.UPSTREAM_FAILED)
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
rerun_failed_tasks=True,
)
run_job(job=job, execute_callable=job_runner._execute)
ti = TI(task=dag.get_task("test_backfill_rerun_upstream_failed_task-1"), execution_date=DEFAULT_DATE)
ti.refresh_from_db()
assert ti.state == State.SUCCESS
def test_backfill_rerun_failed_tasks_without_flag(self, dag_maker):
dag = self._get_dummy_dag(
dag_maker, dag_id="test_backfill_rerun_failed", task_id="test_backfill_rerun_failed_task-1"
)
dag_maker.create_dagrun(state=None)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
)
run_job(job=job, execute_callable=job_runner._execute)
ti = TI(task=dag.get_task("test_backfill_rerun_failed_task-1"), execution_date=DEFAULT_DATE)
ti.refresh_from_db()
ti.set_state(State.FAILED)
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
rerun_failed_tasks=False,
)
with pytest.raises(AirflowException):
run_job(job=job, execute_callable=job_runner._execute)
def test_backfill_retry_intermittent_failed_task(self, dag_maker):
with dag_maker(
dag_id="test_intermittent_failure_job",
schedule="@daily",
default_args={
"retries": 2,
"retry_delay": datetime.timedelta(seconds=0),
},
) as dag:
task1 = EmptyOperator(task_id="task1")
dag_maker.create_dagrun(state=None)
executor = MockExecutor(parallelism=16)
executor.mock_task_results[TaskInstanceKey(dag.dag_id, task1.task_id, DEFAULT_DATE, try_number=1)] = (
State.UP_FOR_RETRY
)
executor.mock_task_results[TaskInstanceKey(dag.dag_id, task1.task_id, DEFAULT_DATE, try_number=2)] = (
State.UP_FOR_RETRY
)
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
)
run_job(job=job, execute_callable=job_runner._execute)
def test_backfill_retry_always_failed_task(self, dag_maker):
with dag_maker(
dag_id="test_always_failure_job",
schedule="@daily",
default_args={
"retries": 1,
"retry_delay": datetime.timedelta(seconds=0),
},
) as dag:
task1 = EmptyOperator(task_id="task1")
dr = dag_maker.create_dagrun(state=None)
executor = MockExecutor(parallelism=16)
executor.mock_task_results[TaskInstanceKey(dag.dag_id, task1.task_id, dr.run_id, try_number=1)] = (
State.UP_FOR_RETRY
)
executor.mock_task_fail(dag.dag_id, task1.task_id, dr.run_id, try_number=2)
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
)
with pytest.raises(BackfillUnfinished):
run_job(job=job, execute_callable=job_runner._execute)
def test_backfill_ordered_concurrent_execute(self, dag_maker):
with dag_maker(
dag_id="test_backfill_ordered_concurrent_execute",
schedule="@daily",
) as dag:
op1 = EmptyOperator(task_id="leave1")
op2 = EmptyOperator(task_id="leave2")
op3 = EmptyOperator(task_id="upstream_level_1")
op4 = EmptyOperator(task_id="upstream_level_2")
op5 = EmptyOperator(task_id="upstream_level_3")
# order randomly
op2.set_downstream(op3)
op1.set_downstream(op3)
op4.set_downstream(op5)
op3.set_downstream(op4)
runid0 = f"backfill__{DEFAULT_DATE.isoformat()}"
dag_maker.create_dagrun(run_id=runid0)
executor = MockExecutor(parallelism=16)
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=2),
)
run_job(job=job, execute_callable=job_runner._execute)
runid1 = f"backfill__{(DEFAULT_DATE + datetime.timedelta(days=1)).isoformat()}"
runid2 = f"backfill__{(DEFAULT_DATE + datetime.timedelta(days=2)).isoformat()}"
# test executor history keeps a list
history = executor.history
assert [sorted(item[-1].key[1:3] for item in batch) for batch in history] == [
[
("leave1", runid0),
("leave1", runid1),
("leave1", runid2),
("leave2", runid0),
("leave2", runid1),
("leave2", runid2),
],
[("upstream_level_1", runid0), ("upstream_level_1", runid1), ("upstream_level_1", runid2)],
[("upstream_level_2", runid0), ("upstream_level_2", runid1), ("upstream_level_2", runid2)],
[("upstream_level_3", runid0), ("upstream_level_3", runid1), ("upstream_level_3", runid2)],
]
def test_backfill_pooled_tasks(self):
"""
Test that queued tasks are executed by BackfillJobRunner
"""
session = settings.Session()
pool = Pool(pool="test_backfill_pooled_task_pool", slots=1, include_deferred=False)
session.add(pool)
session.commit()
session.close()
dag = self.dagbag.get_dag("test_backfill_pooled_task_dag")
dag.clear()
executor = MockExecutor(do_update=True)
job = Job(executor=executor)
job_runner = BackfillJobRunner(job=job, dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
# run with timeout because this creates an infinite loop if not
# caught
try:
with timeout(seconds=5):
run_job(job=job, execute_callable=job_runner._execute)
except AirflowTaskTimeout:
pass
ti = TI(task=dag.get_task("test_backfill_pooled_task"), execution_date=DEFAULT_DATE)
ti.refresh_from_db()
assert ti.state == State.SUCCESS
@pytest.mark.parametrize("ignore_depends_on_past", [True, False])
def test_backfill_depends_on_past_works_independently_on_ignore_depends_on_past(
self, ignore_depends_on_past
):
dag = self.dagbag.get_dag("test_depends_on_past")
dag.clear()
run_date = DEFAULT_DATE + datetime.timedelta(days=5)
job = Job(executor=MockExecutor())
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=run_date,
end_date=run_date,
ignore_first_depends_on_past=ignore_depends_on_past,
)
run_job(job=job, execute_callable=job_runner._execute)
# ti should have succeeded
ti = TI(dag.tasks[0], run_date)
ti.refresh_from_db()
assert ti.state == State.SUCCESS
def test_backfill_depends_on_past_backwards(self):
"""
Test that CLI respects -B argument and raises on interaction with depends_on_past
"""
dag_id = "test_depends_on_past"
start_date = DEFAULT_DATE + datetime.timedelta(days=1)
end_date = start_date + datetime.timedelta(days=1)
kwargs = dict(
start_date=start_date,
end_date=end_date,
)
dag = self.dagbag.get_dag(dag_id)
dag.clear()
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(job=job, dag=dag, ignore_first_depends_on_past=True, **kwargs)
run_job(job=job, execute_callable=job_runner._execute)
ti = TI(dag.get_task("test_dop_task"), end_date)
ti.refresh_from_db()
# runs fine forwards
assert ti.state == State.SUCCESS
# raises backwards
expected_msg = "You cannot backfill backwards because one or more tasks depend_on_past: test_dop_task"
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(job=job, dag=dag, run_backwards=True, **kwargs)
with pytest.raises(AirflowException, match=expected_msg):
run_job(job=job, execute_callable=job_runner._execute)
def test_cli_receives_delay_arg(self):
"""
Tests that the --delay argument is passed correctly to the BackfillJob
"""
dag_id = "example_bash_operator"
run_date = DEFAULT_DATE
args = [
"dags",
"backfill",
dag_id,
"-s",
run_date.isoformat(),
"--delay-on-limit",
"0.5",
]
parsed_args = self.parser.parse_args(args)
assert 0.5 == parsed_args.delay_on_limit
def _get_dag_test_max_active_limits(
self, dag_maker_fixture, dag_id="test_dag", max_active_runs=1, **kwargs
):
with dag_maker_fixture(
dag_id=dag_id,
schedule="@hourly",
max_active_runs=max_active_runs,
**kwargs,
) as dag:
op1 = EmptyOperator(task_id="leave1")
op2 = EmptyOperator(task_id="leave2")
op3 = EmptyOperator(task_id="upstream_level_1")
op4 = EmptyOperator(task_id="upstream_level_2")
op1 >> op2 >> op3
op4 >> op3
return dag
def test_backfill_max_limit_check_within_limit(self, dag_maker):
dag = self._get_dag_test_max_active_limits(
dag_maker, dag_id="test_backfill_max_limit_check_within_limit", max_active_runs=16
)
dag_maker.create_dagrun(state=None)
start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
end_date = DEFAULT_DATE
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=start_date,
end_date=end_date,
donot_pickle=True,
)
run_job(job=job, execute_callable=job_runner._execute)
dagruns = DagRun.find(dag_id=dag.dag_id)
assert 2 == len(dagruns)
assert all(run.state == State.SUCCESS for run in dagruns)
def test_backfill_notifies_dagrun_listener(self, dag_maker):
dag = self._get_dummy_dag(dag_maker)
dag_run = dag_maker.create_dagrun(state=None)
dag_listener.clear()
get_listener_manager().add_listener(dag_listener)
start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
end_date = DEFAULT_DATE
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=start_date,
end_date=end_date,
donot_pickle=True,
)
job.notification_threadpool = mock.MagicMock()
run_job(job=job, execute_callable=job_runner._execute)
assert len(dag_listener.running) == 1
assert len(dag_listener.success) == 1
assert dag_listener.running[0].dag.dag_id == dag_run.dag.dag_id
assert dag_listener.running[0].run_id == dag_run.run_id
assert dag_listener.running[0].state == DagRunState.RUNNING
assert dag_listener.success[0].dag.dag_id == dag_run.dag.dag_id
assert dag_listener.success[0].run_id == dag_run.run_id
assert dag_listener.success[0].state == DagRunState.SUCCESS
def test_backfill_max_limit_check(self, dag_maker):
dag_id = "test_backfill_max_limit_check"
run_id = "test_dag_run"
start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
end_date = DEFAULT_DATE
dag_run_created_cond = threading.Condition()
def run_backfill(cond):
cond.acquire()
# this session object is different than the one in the main thread
with create_session() as thread_session:
try:
dag = self._get_dag_test_max_active_limits(
dag_maker,
dag_id=dag_id,
)
dag_maker.create_dagrun(
state=State.RUNNING,
# Existing dagrun that is not within the backfill range
run_id=run_id,
execution_date=DEFAULT_DATE + datetime.timedelta(hours=1),
)
thread_session.commit()
cond.notify()
except Exception:
logger.exception("Exception when creating DagRun")
finally:
cond.release()
thread_session.close()
executor = MockExecutor()
job = Job(
executor=executor,
)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=start_date,
end_date=end_date,
donot_pickle=True,
)
run_job(job=job, execute_callable=job_runner._execute)
backfill_job_thread = threading.Thread(
target=run_backfill, name="run_backfill", args=(dag_run_created_cond,)
)
dag_run_created_cond.acquire()
with create_session() as session:
backfill_job_thread.start()
try:
# at this point backfill can't run since the max_active_runs has been
# reached, so it is waiting
dag_run_created_cond.wait(timeout=1.5)
dagruns = DagRun.find(dag_id=dag_id)
logger.info("The dag runs retrieved: %s", dagruns)
assert 1 == len(dagruns)
dr = dagruns[0]
assert dr.run_id == run_id
# allow the backfill to execute
# by setting the existing dag run to SUCCESS,
# backfill will execute dag runs 1 by 1
dr.set_state(State.SUCCESS)
session.merge(dr)
session.commit()
backfill_job_thread.join()
dagruns = DagRun.find(dag_id=dag_id)
assert 3 == len(dagruns) # 2 from backfill + 1 existing
assert dagruns[-1].run_id == dr.run_id
finally:
dag_run_created_cond.release()
def test_backfill_max_limit_check_no_count_existing(self, dag_maker):
start_date = DEFAULT_DATE
end_date = DEFAULT_DATE
# Existing dagrun that is within the backfill range
dag = self._get_dag_test_max_active_limits(
dag_maker, dag_id="test_backfill_max_limit_check_no_count_existing"
)
dag_maker.create_dagrun(state=None)
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job, dag=dag, start_date=start_date, end_date=end_date, donot_pickle=True
)
run_job(job=job, execute_callable=job_runner._execute)
# BackfillJobRunner will run since the existing DagRun does not count for the max
# active limit since it's within the backfill date range.
dagruns = DagRun.find(dag_id=dag.dag_id)
# will only be able to run 1 (the existing one) since there's just
# one dag run slot left given the max_active_runs limit
assert 1 == len(dagruns)
assert State.SUCCESS == dagruns[0].state
def test_backfill_max_limit_check_complete_loop(self, dag_maker):
dag = self._get_dag_test_max_active_limits(
dag_maker, dag_id="test_backfill_max_limit_check_complete_loop"
)
dag_maker.create_dagrun(state=None)
start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
end_date = DEFAULT_DATE
# Given the max limit to be 1 in active dag runs, we need to run the
# backfill job 3 times
success_expected = 2
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=start_date,
end_date=end_date,
donot_pickle=True,
)
run_job(job=job, execute_callable=job_runner._execute)
success_dagruns = len(DagRun.find(dag_id=dag.dag_id, state=State.SUCCESS))
running_dagruns = len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING))
assert success_expected == success_dagruns
assert 0 == running_dagruns # no dag_runs in running state are left
def test_sub_set_subdag(self, dag_maker):
with dag_maker(
"test_sub_set_subdag",
on_success_callback=lambda _: None,
on_failure_callback=lambda _: None,
) as dag:
op1 = EmptyOperator(task_id="leave1")
op2 = EmptyOperator(task_id="leave2")
op3 = EmptyOperator(task_id="upstream_level_1")
op4 = EmptyOperator(task_id="upstream_level_2")
op5 = EmptyOperator(task_id="upstream_level_3")
# order randomly
op2.set_downstream(op3)
op1.set_downstream(op3)
op4.set_downstream(op5)
op3.set_downstream(op4)
dr = dag_maker.create_dagrun(state=None)
executor = MockExecutor()
sub_dag = dag.partial_subset(
task_ids_or_regex="leave*", include_downstream=False, include_upstream=False
)
job = Job(executor=executor)
job_runner = BackfillJobRunner(job=job, dag=sub_dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
run_job(job=job, execute_callable=job_runner._execute)
for ti in dr.get_task_instances():
if ti.task_id == "leave1" or ti.task_id == "leave2":
assert State.SUCCESS == ti.state
else:
assert State.NONE == ti.state
def test_backfill_fill_blanks(self, dag_maker):
with dag_maker(
"test_backfill_fill_blanks",
) as dag:
op1 = EmptyOperator(task_id="op1")
op2 = EmptyOperator(task_id="op2")
op3 = EmptyOperator(task_id="op3")
op4 = EmptyOperator(task_id="op4")
op5 = EmptyOperator(task_id="op5")
op6 = EmptyOperator(task_id="op6")
dr = dag_maker.create_dagrun(state=None)
executor = MockExecutor()
session = settings.Session()
tis = dr.get_task_instances()
for ti in tis:
if ti.task_id == op1.task_id:
ti.state = State.UP_FOR_RETRY
ti.end_date = DEFAULT_DATE
elif ti.task_id == op2.task_id:
ti.state = State.FAILED
elif ti.task_id == op3.task_id:
ti.state = State.SKIPPED
elif ti.task_id == op4.task_id:
ti.state = State.SCHEDULED
elif ti.task_id == op5.task_id:
ti.state = State.UPSTREAM_FAILED
# op6 = None
session.merge(ti)
session.commit()
session.close()
job = Job(executor=executor)
job_runner = BackfillJobRunner(job=job, dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
with pytest.raises(AirflowException, match="Some task instances failed"):
run_job(job=job, execute_callable=job_runner._execute)
dr.refresh_from_db()
assert dr.state == State.FAILED
tis = dr.get_task_instances()
for ti in tis:
if ti.task_id in (op1.task_id, op4.task_id, op6.task_id):
assert ti.state == State.SUCCESS
elif ti.task_id == op2.task_id:
assert ti.state == State.FAILED
elif ti.task_id == op3.task_id:
assert ti.state == State.SKIPPED
elif ti.task_id == op5.task_id:
assert ti.state == State.UPSTREAM_FAILED
def test_backfill_execute_subdag(self):
dag = self.dagbag.get_dag("example_subdag_operator")
subdag_op_task = dag.get_task("section-1")
subdag = subdag_op_task.subdag
subdag.timetable = cron_timetable("@daily")
start_date = timezone.utcnow()
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=subdag,
start_date=start_date,
end_date=start_date,
donot_pickle=True,
)
run_job(job=job, execute_callable=job_runner._execute)
subdag_op_task.pre_execute(context={"execution_date": start_date})
subdag_op_task.execute(context={"execution_date": start_date})
subdag_op_task.post_execute(context={"execution_date": start_date})
history = executor.history
subdag_history = history[0]
# check that all 5 task instances of the subdag 'section-1' were executed
assert 5 == len(subdag_history)
for sdh in subdag_history:
ti = sdh[3]
assert "section-1-task-" in ti.task_id
with create_session() as session:
successful_subdag_runs = (
session.query(DagRun)
.filter(DagRun.dag_id == subdag.dag_id)
.filter(DagRun.execution_date == start_date)
.filter(DagRun.state == State.SUCCESS)
.count()
)
assert 1 == successful_subdag_runs
subdag.clear()
dag.clear()
def test_subdag_clear_parentdag_downstream_clear(self):
dag = self.dagbag.get_dag("clear_subdag_test_dag")
subdag_op_task = dag.get_task("daily_job")
subdag = subdag_op_task.subdag
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
donot_pickle=True,
)
with timeout(seconds=30):
run_job(job=job, execute_callable=job_runner._execute)
ti_subdag = TI(task=dag.get_task("daily_job"), execution_date=DEFAULT_DATE)
ti_subdag.refresh_from_db()
assert ti_subdag.state == State.SUCCESS
ti_irrelevant = TI(task=dag.get_task("daily_job_irrelevant"), execution_date=DEFAULT_DATE)
ti_irrelevant.refresh_from_db()
assert ti_irrelevant.state == State.SUCCESS
ti_downstream = TI(task=dag.get_task("daily_job_downstream"), execution_date=DEFAULT_DATE)
ti_downstream.refresh_from_db()
assert ti_downstream.state == State.SUCCESS
sdag = subdag.partial_subset(
task_ids_or_regex="daily_job_subdag_task", include_downstream=True, include_upstream=False
)
sdag.clear(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, include_parentdag=True)
ti_subdag.refresh_from_db()
assert State.NONE == ti_subdag.state
ti_irrelevant.refresh_from_db()
assert State.SUCCESS == ti_irrelevant.state
ti_downstream.refresh_from_db()
assert State.NONE == ti_downstream.state
subdag.clear()
dag.clear()
def test_backfill_execute_subdag_with_removed_task(self):
"""
Ensure that subdag operators execute properly in the case where
an associated task of the subdag has been removed from the dag
definition, but has instances in the database from previous runs.
"""
dag = self.dagbag.get_dag("example_subdag_operator")
subdag = dag.get_task("section-1").subdag
session = settings.Session()
executor = MockExecutor()
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=subdag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
donot_pickle=True,
)
dr = DagRun(
dag_id=subdag.dag_id, execution_date=DEFAULT_DATE, run_id="test", run_type=DagRunType.BACKFILL_JOB
)
session.add(dr)
removed_task_ti = TI(
task=EmptyOperator(task_id="removed_task"), run_id=dr.run_id, state=State.REMOVED
)
removed_task_ti.dag_id = subdag.dag_id
dr.task_instances.append(removed_task_ti)
session.commit()
with timeout(seconds=30):
run_job(job=job, execute_callable=job_runner._execute)
for task in subdag.tasks:
instance = (
session.query(TI)
.filter(
TI.dag_id == subdag.dag_id, TI.task_id == task.task_id, TI.execution_date == DEFAULT_DATE
)
.first()
)
assert instance is not None
assert instance.state == State.SUCCESS
removed_task_ti.refresh_from_db()
assert removed_task_ti.state == State.REMOVED
subdag.clear()
dag.clear()
def test_update_counters(self, dag_maker, session):
with dag_maker(dag_id="test_manage_executor_state", start_date=DEFAULT_DATE, session=session) as dag:
task1 = EmptyOperator(task_id="dummy", owner="airflow")
dr = dag_maker.create_dagrun(state=None)
job = Job()
job_runner = BackfillJobRunner(job=job, dag=dag)
ti = TI(task1, dr.execution_date)
ti.refresh_from_db()
ti_status = BackfillJobRunner._DagRunTaskStatus()
# Test for success
# The in-memory task key in ti_status.running contains a try_number
# that is not in sync with the DB. To test that _update_counters method
# handles this, we mark the task as running in-memory and then increase
# the try number as it would be before the raw task is executed.
# When updating the counters the in-memory key will be used which will
# match what's in the in-memory ti_status.running map. This is the same
# for skipped, failed and retry states.
ti_status.running[ti.key] = ti # Task is queued and marked as running
ti._try_number += 1 # Try number is increased during ti.run()
ti.set_state(State.SUCCESS, session) # Task finishes with success state
job_runner._update_counters(ti_status=ti_status, session=session) # Update counters
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 1
assert len(ti_status.skipped) == 0
assert len(ti_status.failed) == 0
assert len(ti_status.to_run) == 0
ti_status.succeeded.clear()
# Test for success when DB try_number is off from in-memory expectations
ti_status.running[ti.key] = ti
ti._try_number += 2
ti.set_state(State.SUCCESS, session)
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 1
assert len(ti_status.skipped) == 0
assert len(ti_status.failed) == 0
assert len(ti_status.to_run) == 0
ti_status.succeeded.clear()
# Test for skipped
ti_status.running[ti.key] = ti
ti._try_number += 1
ti.set_state(State.SKIPPED, session)
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 0
assert len(ti_status.skipped) == 1
assert len(ti_status.failed) == 0
assert len(ti_status.to_run) == 0
ti_status.skipped.clear()
# Test for failed
ti_status.running[ti.key] = ti
ti._try_number += 1
ti.set_state(State.FAILED, session)
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 0
assert len(ti_status.skipped) == 0
assert len(ti_status.failed) == 1
assert len(ti_status.to_run) == 0
ti_status.failed.clear()
# Test for retry
ti_status.running[ti.key] = ti
ti._try_number += 1
ti.set_state(State.UP_FOR_RETRY, session)
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 0
assert len(ti_status.skipped) == 0
assert len(ti_status.failed) == 0
assert len(ti_status.to_run) == 1
ti_status.to_run.clear()
# Test for reschedule
# Logic in taskinstance reduces the try number for a task that's been
# rescheduled (which makes sense because it's the _same_ try, but it's
# just being rescheduled to a later time). This now makes the in-memory
# and DB representation of the task try_number the _same_, which is unlike
# the above cases. But this is okay because the in-memory key is used.
ti_status.running[ti.key] = ti # Task queued and marked as running
# Note: Both the increase and decrease are kept here for context
ti._try_number += 1 # Try number is increased during ti.run()
ti._try_number -= 1 # Task is being rescheduled, decrement try_number
ti.set_state(State.UP_FOR_RESCHEDULE, session) # Task finishes with reschedule state
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 0
assert len(ti_status.skipped) == 0
assert len(ti_status.failed) == 0
assert len(ti_status.to_run) == 1
ti_status.to_run.clear()
# test for none
ti.set_state(State.NONE, session)
# Setting ti._try_number = 0 brings us to ti.try_number==1
# so that the in-memory key access will work fine
ti._try_number = 0
assert ti.try_number == 1 # see ti.try_number property in taskinstance module
session.merge(ti)
session.commit()
ti_status.running[ti.key] = ti
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 0
assert len(ti_status.skipped) == 0
assert len(ti_status.failed) == 0
assert len(ti_status.to_run) == 1
ti_status.to_run.clear()
# test for scheduled
ti.set_state(State.SCHEDULED)
# Deferred tasks are put into scheduled by the triggerer
# Check that they are put into to_run
ti_status.running[ti.key] = ti
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 0
assert len(ti_status.succeeded) == 0
assert len(ti_status.skipped) == 0
assert len(ti_status.failed) == 0
assert len(ti_status.to_run) == 1
ti_status.to_run.clear()
# test for deferred
# if a task is deferred and it's not yet time for the triggerer
# to reschedule it, we should leave it in ti_status.running
ti.set_state(State.DEFERRED)
ti_status.running[ti.key] = ti
job_runner._update_counters(ti_status=ti_status, session=session)
assert len(ti_status.running) == 1
assert len(ti_status.succeeded) == 0
assert len(ti_status.skipped) == 0
assert len(ti_status.failed) == 0
assert len(ti_status.to_run) == 0
session.close()
def test_dag_dagrun_infos_between(self, dag_maker):
with dag_maker(
dag_id="dagrun_infos_between", start_date=DEFAULT_DATE, schedule="@hourly"
) as test_dag:
EmptyOperator(
task_id="dummy",
owner="airflow",
)
assert [DEFAULT_DATE] == [
info.logical_date
for info in test_dag.iter_dagrun_infos_between(
earliest=DEFAULT_DATE,
latest=DEFAULT_DATE,
)
]
assert [
DEFAULT_DATE - datetime.timedelta(hours=3),
DEFAULT_DATE - datetime.timedelta(hours=2),
DEFAULT_DATE - datetime.timedelta(hours=1),
DEFAULT_DATE,
] == [
info.logical_date
for info in test_dag.iter_dagrun_infos_between(
earliest=DEFAULT_DATE - datetime.timedelta(hours=3),
latest=DEFAULT_DATE,
)
]
def test_backfill_run_backwards(self):
dag = self.dagbag.get_dag("test_start_date_scheduling")
dag.clear()
executor = MockExecutor(parallelism=16)
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=1),
run_backwards=True,
)
run_job(job=job, execute_callable=job_runner._execute)
session = settings.Session()
tis = (
session.query(TI)
.join(TI.dag_run)
.filter(TI.dag_id == "test_start_date_scheduling" and TI.task_id == "dummy")
.order_by(DagRun.execution_date)
.all()
)
queued_times = [ti.queued_dttm for ti in tis]
assert queued_times == sorted(queued_times, reverse=True)
assert all(ti.state == State.SUCCESS for ti in tis)
dag.clear()
session.close()
def test_reset_orphaned_tasks_with_orphans(self, dag_maker):
"""Create dagruns and ensure only ones with correct states are reset."""
prefix = "backfill_job_test_test_reset_orphaned_tasks"
states = [State.QUEUED, State.SCHEDULED, State.NONE, State.RUNNING, State.SUCCESS]
states_to_reset = [State.QUEUED, State.SCHEDULED, State.NONE]
tasks = []
with dag_maker(dag_id=prefix) as dag:
for i in range(len(states)):
task_id = f"{prefix}_task_{i}"
task = EmptyOperator(task_id=task_id)
tasks.append(task)
session = settings.Session()
job = Job()
job_runner = BackfillJobRunner(job=job, dag=dag)
# create dagruns
dr1 = dag_maker.create_dagrun(state=State.RUNNING)
dr2 = dag.create_dagrun(run_id="test2", state=State.SUCCESS)
# create taskinstances and set states
dr1_tis = []
dr2_tis = []
for task, state in zip(tasks, states):
ti1 = TI(task, dr1.execution_date)
ti2 = TI(task, dr2.execution_date)
ti1.refresh_from_db()
ti2.refresh_from_db()
ti1.state = state
ti2.state = state
dr1_tis.append(ti1)
dr2_tis.append(ti2)
session.merge(ti1)
session.merge(ti2)
session.commit()
assert 2 == job_runner.reset_state_for_orphaned_tasks()
for ti in dr1_tis + dr2_tis:
ti.refresh_from_db()
# running dagrun should be reset
for state, ti in zip(states, dr1_tis):
if state in states_to_reset:
assert ti.state is None
else:
assert state == ti.state
# otherwise not
for state, ti in zip(states, dr2_tis):
assert state == ti.state
for state, ti in zip(states, dr1_tis):
ti.state = state
session.commit()
job_runner.reset_state_for_orphaned_tasks(filter_by_dag_run=dr1, session=session)
# check same for dag_run version
for state, ti in zip(states, dr2_tis):
assert state == ti.state
def test_reset_orphaned_tasks_specified_dagrun(self, session, dag_maker):
"""Try to reset when we specify a dagrun and ensure nothing else is."""
dag_id = "test_reset_orphaned_tasks_specified_dagrun"
task_id = dag_id + "_task"
with dag_maker(
dag_id=dag_id,
start_date=DEFAULT_DATE,
schedule="@daily",
session=session,
) as dag:
EmptyOperator(task_id=task_id, dag=dag)
job = Job()
job_runner = BackfillJobRunner(job=job, dag=dag)
# make two dagruns, only reset for one
dr1 = dag_maker.create_dagrun(state=State.SUCCESS)
dr2 = dag.create_dagrun(run_id="test2", state=State.RUNNING, session=session)
ti1 = dr1.get_task_instances(session=session)[0]
ti2 = dr2.get_task_instances(session=session)[0]
ti1.state = State.SCHEDULED
ti2.state = State.SCHEDULED
session.merge(ti1)
session.merge(ti2)
session.merge(dr1)
session.merge(dr2)
session.flush()
num_reset_tis = job_runner.reset_state_for_orphaned_tasks(filter_by_dag_run=dr2, session=session)
assert 1 == num_reset_tis
ti1.refresh_from_db(session=session)
ti2.refresh_from_db(session=session)
assert State.SCHEDULED == ti1.state
assert State.NONE == ti2.state
def test_job_id_is_assigned_to_dag_run(self, dag_maker):
dag_id = "test_job_id_is_assigned_to_dag_run"
with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE, schedule="@daily") as dag:
EmptyOperator(task_id="dummy_task", dag=dag)
job = Job(executor=MockExecutor())
job_runner = BackfillJobRunner(
job=job, dag=dag, start_date=timezone.utcnow() - datetime.timedelta(days=1)
)
run_job(job=job, execute_callable=job_runner._execute)
dr: DagRun = dag.get_last_dagrun()
assert dr.creating_job_id == job.id
def test_backfill_has_job_id_int(self):
"""Make sure that backfill jobs are assigned job_ids and that the job_id is an int."""
dag = self.dagbag.get_dag("test_start_date_scheduling")
dag.clear()
executor = MockExecutor(parallelism=16)
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE + datetime.timedelta(days=1),
run_backwards=True,
)
run_job(job=job, execute_callable=job_runner._execute)
assert isinstance(executor.job_id, int)
@pytest.mark.long_running
@pytest.mark.parametrize("executor_name", ["SequentialExecutor", "DebugExecutor"])
@pytest.mark.parametrize("dag_id", ["test_mapped_classic", "test_mapped_taskflow", "test_sensor"])
def test_backfilling_dags(self, dag_id, executor_name, session):
"""
End-to-end test for backfilling dags with various executors.
We test with multiple executors as they have different "execution environments" -- for instance
DebugExecutor runs a lot more in the same process than other Executors.
"""
# This test needs a real executor to run, so that the `make_list` task can write out the TaskMap
from airflow.executors.executor_loader import ExecutorLoader
self.dagbag.process_file(str(TEST_DAGS_FOLDER / f"{dag_id}.py"))
dag = self.dagbag.get_dag(dag_id)
when = timezone.datetime(2022, 1, 1)
job = Job(executor=ExecutorLoader.load_executor(executor_name))
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=when,
end_date=when,
donot_pickle=True,
)
run_job(job=job, execute_callable=job_runner._execute)
dr = DagRun.find(dag_id=dag.dag_id, execution_date=when, session=session)[0]
assert dr
assert dr.state == DagRunState.SUCCESS
# Check that every task has a start and end date
for ti in dr.task_instances:
assert ti.state == TaskInstanceState.SUCCESS
assert ti.start_date is not None
assert ti.end_date is not None
def test_mapped_dag_pre_existing_tis(self, dag_maker, session):
"""If the DagRun already has some mapped TIs, ensure that we re-run them successfully"""
from airflow.decorators import task
from airflow.operators.python import PythonOperator
list_result = [[1], [2], [{"a": "b"}]]
@task
def make_arg_lists():
return list_result
def consumer(value):
print(repr(value))
with dag_maker(session=session) as dag:
consumer_op = PythonOperator.partial(task_id="consumer", python_callable=consumer).expand(
op_args=make_arg_lists()
)
PythonOperator.partial(task_id="consumer_literal", python_callable=consumer).expand(
op_args=[[1], [2], [3]],
)
dr = dag_maker.create_dagrun()
# Create the existing mapped TIs -- this the crucial part of this test
ti = dr.get_task_instance("consumer", session=session)
ti.map_index = 0
for map_index in range(1, 3):
ti = TI(consumer_op, run_id=dr.run_id, map_index=map_index)
session.add(ti)
ti.dag_run = dr
session.flush()
executor = MockExecutor()
ti_status = BackfillJobRunner._DagRunTaskStatus()
ti_status.active_runs.add(dr)
ti_status.to_run = {ti.key: ti for ti in dr.task_instances}
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=dr.execution_date,
end_date=dr.execution_date,
donot_pickle=True,
)
executor_change_state = executor.change_state
def on_change_state(key, state, info=None):
if key.task_id == "make_arg_lists":
session.add(
TaskMap(
length=len(list_result),
keys=None,
dag_id=key.dag_id,
run_id=key.run_id,
task_id=key.task_id,
map_index=key.map_index,
)
)
session.flush()
executor_change_state(key, state, info)
with patch.object(executor, "change_state", side_effect=on_change_state):
job_runner._process_backfill_task_instances(
ti_status=ti_status,
executor=job.executor,
start_date=dr.execution_date,
pickle_id=None,
session=session,
)
assert ti_status.failed == set()
assert ti_status.succeeded == {
TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=1, map_index=0),
TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=1, map_index=1),
TaskInstanceKey(dag_id=dr.dag_id, task_id="consumer", run_id="test", try_number=1, map_index=2),
TaskInstanceKey(
dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=1, map_index=0
),
TaskInstanceKey(
dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=1, map_index=1
),
TaskInstanceKey(
dag_id=dr.dag_id, task_id="consumer_literal", run_id="test", try_number=1, map_index=2
),
TaskInstanceKey(
dag_id=dr.dag_id, task_id="make_arg_lists", run_id="test", try_number=1, map_index=-1
),
}
def test_mapped_dag_unexpandable(self, dag_maker, session):
with dag_maker(session=session) as dag:
@dag.task
def get_things():
return [1, 2]
@dag.task
def this_fails() -> None:
raise RuntimeError("sorry!")
@dag.task(trigger_rule=TriggerRule.ALL_DONE)
def consumer(a, b):
print(a, b)
consumer.expand(a=get_things(), b=this_fails())
executor = MockExecutor()
when = timezone.datetime(2022, 1, 1)
job = Job(executor=executor)
job_runner = BackfillJobRunner(job=job, dag=dag, start_date=when, end_date=when, donot_pickle=True)
run_job(job=job, execute_callable=job_runner._execute)
(dr,) = DagRun.find(dag_id=dag.dag_id, execution_date=when, session=session)
assert dr.state == DagRunState.FAILED
# Check that every task has a start and end date
tis = {(ti.task_id, ti.map_index): ti for ti in dr.task_instances}
assert len(tis) == 3
tis[("get_things", -1)].state == TaskInstanceState.SUCCESS
tis[("this_fails", -1)].state == TaskInstanceState.FAILED
tis[("consumer", -1)].state == TaskInstanceState.UPSTREAM_FAILED
def test_start_date_set_for_resetted_dagruns(self, dag_maker, session, caplog):
with dag_maker() as dag:
EmptyOperator(task_id="task1")
dr = dag_maker.create_dagrun()
dr.state = State.SUCCESS
session.merge(dr)
session.flush()
dag.clear()
job = Job(executor=MockExecutor())
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
donot_pickle=True,
)
run_job(job=job, execute_callable=job_runner._execute)
(dr,) = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE, session=session)
assert dr.start_date
assert f"Failed to record duration of {dr}" not in caplog.text
def test_task_instances_are_not_set_to_scheduled_when_dagrun_reset(self, dag_maker, session):
"""Test that when dagrun is reset, task instances are not set to scheduled"""
with dag_maker() as dag:
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
task1 >> task2 >> task3
for i in range(1, 4):
dag_maker.create_dagrun(
run_id=f"test_dagrun_{i}", execution_date=DEFAULT_DATE + datetime.timedelta(days=i)
)
dag.clear()
job = Job(executor=MockExecutor())
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE + datetime.timedelta(days=1),
end_date=DEFAULT_DATE + datetime.timedelta(days=4),
donot_pickle=True,
)
for dr in DagRun.find(dag_id=dag.dag_id, session=session):
tasks_to_run = job_runner._task_instances_for_dag_run(dag, dr, session=session)
states = [ti.state for _, ti in tasks_to_run.items()]
assert TaskInstanceState.SCHEDULED in states
assert State.NONE in states
@pytest.mark.parametrize(
["disable_retry", "try_number", "exception"],
(
(True, 1, BackfillUnfinished),
(False, 2, AirflowException),
),
)
def test_backfill_disable_retry(self, dag_maker, disable_retry, try_number, exception):
with dag_maker(
dag_id="test_disable_retry",
schedule="@daily",
default_args={
"retries": 2,
"retry_delay": datetime.timedelta(seconds=3),
},
) as dag:
task1 = EmptyOperator(task_id="task1")
dag_run = dag_maker.create_dagrun(state=None)
executor = MockExecutor(parallelism=16)
executor.mock_task_results[
TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, try_number=1)
] = TaskInstanceState.UP_FOR_RETRY
executor.mock_task_results[
TaskInstanceKey(dag.dag_id, task1.task_id, dag_run.run_id, try_number=2)
] = TaskInstanceState.FAILED
job = Job(executor=executor)
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
disable_retry=disable_retry,
)
with pytest.raises(exception):
run_job(job=job, execute_callable=job_runner._execute)
ti = dag_run.get_task_instance(task_id=task1.task_id)
assert ti._try_number == try_number
dag_run.refresh_from_db()
assert dag_run.state == DagRunState.FAILED
dag.clear()
def test_backfill_failed_dag_with_upstream_failed_task(self, dag_maker):
self.dagbag.process_file(str(TEST_DAGS_FOLDER / "test_backfill_with_upstream_failed_task.py"))
dag = self.dagbag.get_dag("test_backfill_with_upstream_failed_task")
# We have to use the "fake" version of perform_heartbeat due to the 'is_unit_test' check in
# the original one. However, instead of using the original version of perform_heartbeat,
# we can simply wait for a LocalExecutor's worker cycle. The approach with sleep works well now,
# but it can be replaced with checking the state of the LocalTaskJob.
def fake_perform_heartbeat(*args, **kwargs):
import time
time.sleep(1)
with mock.patch("airflow.jobs.backfill_job_runner.perform_heartbeat", fake_perform_heartbeat):
job = Job(executor=ExecutorLoader.load_executor("LocalExecutor"))
job_runner = BackfillJobRunner(
job=job,
dag=dag,
start_date=DEFAULT_DATE,
end_date=DEFAULT_DATE,
rerun_failed_tasks=True,
)
with pytest.raises(BackfillUnfinished):
run_job(job=job, execute_callable=job_runner._execute)
dr: DagRun = dag.get_last_dagrun()
assert dr.state == State.FAILED