blob: 491e345649fe1dbc4bc01c1beb1867602d961d3f [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import contextlib
import datetime
import logging
import os
from collections import deque
from datetime import timedelta
from typing import Generator
from unittest import mock
from unittest.mock import MagicMock, PropertyMock, patch
import psutil
import pytest
import time_machine
from sqlalchemy import func, select, update
import airflow.example_dags
from airflow import settings
from airflow.callbacks.callback_requests import DagCallbackRequest, SlaCallbackRequest, TaskCallbackRequest
from airflow.callbacks.database_callback_sink import DatabaseCallbackSink
from airflow.callbacks.pipe_callback_sink import PipeCallbackSink
from airflow.dag_processing.manager import DagFileProcessorAgent
from airflow.datasets import Dataset
from airflow.datasets.manager import DatasetManager
from airflow.exceptions import AirflowException
from airflow.executors.base_executor import BaseExecutor
from airflow.executors.executor_constants import MOCK_EXECUTOR
from airflow.executors.executor_loader import ExecutorLoader
from airflow.jobs.backfill_job_runner import BackfillJobRunner
from airflow.jobs.job import Job, run_job
from airflow.jobs.local_task_job_runner import LocalTaskJobRunner
from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.models.dataset import DatasetDagRunQueue, DatasetEvent, DatasetModel
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.pool import Pool
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
from airflow.serialization.serialized_objects import SerializedDAG
from airflow.utils import timezone
from airflow.utils.file import list_py_file_paths
from airflow.utils.session import create_session, provide_session
from airflow.utils.state import DagRunState, JobState, State, TaskInstanceState
from airflow.utils.types import DagRunType
from tests.listeners import dag_listener
from tests.listeners.test_listeners import get_listener_manager
from tests.models import TEST_DAGS_FOLDER
from tests.test_utils.asserts import assert_queries_count
from tests.test_utils.config import conf_vars, env_vars
from tests.test_utils.db import (
clear_db_dags,
clear_db_datasets,
clear_db_import_errors,
clear_db_jobs,
clear_db_pools,
clear_db_runs,
clear_db_serialized_dags,
clear_db_sla_miss,
set_default_pool_slots,
)
from tests.test_utils.mock_executor import MockExecutor
from tests.test_utils.mock_operators import CustomOperator
from tests.utils.test_timezone import UTC
pytestmark = pytest.mark.db_test
ROOT_FOLDER = os.path.realpath(
os.path.join(os.path.dirname(os.path.realpath(__file__)), os.pardir, os.pardir)
)
PERF_DAGS_FOLDER = os.path.join(ROOT_FOLDER, "tests", "test_utils", "perf", "dags")
ELASTIC_DAG_FILE = os.path.join(PERF_DAGS_FOLDER, "elastic_dag.py")
TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"]
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
DEFAULT_LOGICAL_DATE = timezone.coerce_datetime(DEFAULT_DATE)
TRY_NUMBER = 1
@pytest.fixture(scope="class")
def disable_load_example():
with conf_vars({("core", "load_examples"): "false"}):
with env_vars({"AIRFLOW__CORE__LOAD_EXAMPLES": "false"}):
yield
@pytest.fixture(scope="module")
def dagbag():
# Ensure the DAGs we are looking at from the DB are up-to-date
non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False)
non_serialized_dagbag.sync_to_db()
return DagBag(read_dags_from_db=True)
@pytest.fixture
def load_examples():
with conf_vars({("core", "load_examples"): "True"}):
yield
# Patch the MockExecutor into the dict of known executors in the Loader
@patch.dict(
ExecutorLoader.executors, {MOCK_EXECUTOR: f"{MockExecutor.__module__}.{MockExecutor.__qualname__}"}
)
@pytest.mark.usefixtures("disable_load_example")
@pytest.mark.need_serialized_dag
class TestSchedulerJob:
@staticmethod
def clean_db():
clear_db_runs()
clear_db_pools()
clear_db_dags()
clear_db_sla_miss()
clear_db_import_errors()
clear_db_jobs()
clear_db_datasets()
# DO NOT try to run clear_db_serialized_dags() here - this will break the tests
# The tests expect DAGs to be fully loaded here via setUpClass method below
@pytest.fixture(autouse=True)
def per_test(self) -> Generator:
self.clean_db()
self.job_runner = None
yield
if self.job_runner and self.job_runner.processor_agent:
self.job_runner.processor_agent.end()
self.job_runner = None
self.clean_db()
@pytest.fixture(autouse=True)
def set_instance_attrs(self, dagbag) -> Generator:
self.dagbag: DagBag = dagbag
# Speed up some tests by not running the tasks, just look at what we
# enqueue!
self.null_exec: MockExecutor | None = MockExecutor()
# Since we don't want to store the code for the DAG defined in this file
with patch("airflow.dag_processing.manager.SerializedDagModel.remove_deleted_dags"), patch(
"airflow.models.dag.DagCode.bulk_sync_to_db"
):
yield
self.null_exec = None
del self.dagbag
@pytest.mark.parametrize(
"configs",
[
{("scheduler", "standalone_dag_processor"): "False"},
{("scheduler", "standalone_dag_processor"): "True"},
],
)
def test_is_alive(self, configs):
with conf_vars(configs):
scheduler_job = Job(heartrate=10, state=State.RUNNING)
self.job_runner = SchedulerJobRunner(scheduler_job)
assert scheduler_job.is_alive()
scheduler_job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=20)
assert scheduler_job.is_alive()
scheduler_job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=31)
assert not scheduler_job.is_alive()
# test because .seconds was used before instead of total_seconds
# internal repr of datetime is (days, seconds)
scheduler_job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(days=1)
assert not scheduler_job.is_alive()
scheduler_job.state = State.SUCCESS
scheduler_job.latest_heartbeat = timezone.utcnow() - datetime.timedelta(seconds=10)
assert (
not scheduler_job.is_alive()
), "Completed jobs even with recent heartbeat should not be alive"
@pytest.mark.parametrize(
"heartrate",
[10, 5],
)
def test_heartrate(self, heartrate):
with conf_vars({("scheduler", "scheduler_heartbeat_sec"): str(heartrate)}):
scheduler_job = Job(executor=self.null_exec)
_ = SchedulerJobRunner(job=scheduler_job)
assert scheduler_job.heartrate == heartrate
def run_single_scheduler_loop_with_no_dags(self, dags_folder):
"""
Utility function that runs a single scheduler loop without actually
changing/scheduling any dags. This is useful to simulate the other side effects of
running a scheduler loop, e.g. to see what parse errors there are in the
dags_folder.
:param dags_folder: the directory to traverse
"""
scheduler_job = Job(
executor=self.null_exec,
num_times_parse_dags=1,
subdir=os.path.join(dags_folder),
)
self.job_runner = SchedulerJobRunner(scheduler_job)
scheduler_job.heartrate = 0
run_job(scheduler_job, execute_callable=self.job_runner._execute)
def test_no_orphan_process_will_be_left(self, tmp_path):
current_process = psutil.Process()
old_children = current_process.children(recursive=True)
scheduler_job = Job(
executor=MockExecutor(do_update=False),
)
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.fspath(tmp_path), num_runs=1)
run_job(scheduler_job, execute_callable=self.job_runner._execute)
# Remove potential noise created by previous tests.
current_children = set(current_process.children(recursive=True)) - set(old_children)
assert not current_children
@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
def test_process_executor_events(self, mock_stats_incr, mock_task_callback, dag_maker):
dag_id = "test_process_executor_events"
task_id_1 = "dummy_task"
session = settings.Session()
with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
task1 = EmptyOperator(task_id=task_id_1)
ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
mock_stats_incr.reset_mock()
executor = MockExecutor(do_update=False)
task_callback = mock.MagicMock()
mock_task_callback.return_value = task_callback
scheduler_job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
ti1.state = State.QUEUED
session.merge(ti1)
session.commit()
executor.event_buffer[ti1.key] = State.FAILED, None
self.job_runner._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.FAILED
scheduler_job.executor.callback_sink.send.assert_not_called()
self.job_runner.processor_agent.reset_mock()
# ti in success state
ti1.state = State.SUCCESS
session.merge(ti1)
session.commit()
executor.event_buffer[ti1.key] = State.SUCCESS, None
self.job_runner._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.SUCCESS
scheduler_job.executor.callback_sink.send.assert_not_called()
mock_stats_incr.assert_has_calls(
[
mock.call(
"scheduler.tasks.killed_externally",
tags={"dag_id": dag_id, "task_id": ti1.task_id},
),
mock.call("operator_failures_EmptyOperator", tags={"dag_id": dag_id, "task_id": ti1.task_id}),
mock.call("ti_failures", tags={"dag_id": dag_id, "task_id": ti1.task_id}),
],
any_order=True,
)
@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
def test_process_executor_events_with_no_callback(self, mock_stats_incr, mock_task_callback, dag_maker):
dag_id = "test_process_executor_events_with_no_callback"
task_id = "test_task"
run_id = "test_run"
mock_stats_incr.reset_mock()
executor = MockExecutor(do_update=False)
task_callback = mock.MagicMock()
mock_task_callback.return_value = task_callback
scheduler_job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
session = settings.Session()
with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
task1 = EmptyOperator(task_id=task_id, retries=1)
ti1 = dag_maker.create_dagrun(
run_id=run_id, execution_date=DEFAULT_DATE + timedelta(hours=1)
).get_task_instance(task1.task_id)
mock_stats_incr.reset_mock()
executor = MockExecutor(do_update=False)
task_callback = mock.MagicMock()
mock_task_callback.return_value = task_callback
scheduler_job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
ti1.state = State.QUEUED
session.merge(ti1)
session.commit()
executor.event_buffer[ti1.key] = State.FAILED, None
self.job_runner._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.UP_FOR_RETRY
scheduler_job.executor.callback_sink.send.assert_not_called()
# ti in success state
ti1.state = State.SUCCESS
session.merge(ti1)
session.commit()
executor.event_buffer[ti1.key] = State.SUCCESS, None
self.job_runner._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.SUCCESS
scheduler_job.executor.callback_sink.send.assert_not_called()
mock_stats_incr.assert_has_calls(
[
mock.call(
"scheduler.tasks.killed_externally",
tags={"dag_id": dag_id, "task_id": task_id},
),
mock.call("operator_failures_EmptyOperator", tags={"dag_id": dag_id, "task_id": task_id}),
mock.call("ti_failures", tags={"dag_id": dag_id, "task_id": task_id}),
],
any_order=True,
)
@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
def test_process_executor_events_with_callback(self, mock_stats_incr, mock_task_callback, dag_maker):
dag_id = "test_process_executor_events_with_callback"
task_id_1 = "dummy_task"
with dag_maker(dag_id=dag_id, fileloc="/test_path1/") as dag:
task1 = EmptyOperator(task_id=task_id_1, on_failure_callback=lambda x: print("hi"))
ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
mock_stats_incr.reset_mock()
executor = MockExecutor(do_update=False)
task_callback = mock.MagicMock()
mock_task_callback.return_value = task_callback
scheduler_job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
session = settings.Session()
ti1.state = State.QUEUED
session.merge(ti1)
session.commit()
executor.event_buffer[ti1.key] = State.FAILED, None
self.job_runner._process_executor_events(session=session)
ti1.refresh_from_db()
# The state will remain in queued here and
# will be set to failed in dag parsing process
assert ti1.state == State.QUEUED
mock_task_callback.assert_called_once_with(
full_filepath=dag.fileloc,
simple_task_instance=mock.ANY,
processor_subdir=None,
msg="Executor reports task instance "
"<TaskInstance: test_process_executor_events_with_callback.dummy_task test [queued]> "
"finished (failed) although the task says it's queued. (Info: None) "
"Was the task killed externally?",
)
scheduler_job.executor.callback_sink.send.assert_called_once_with(task_callback)
scheduler_job.executor.callback_sink.reset_mock()
mock_stats_incr.assert_called_once_with(
"scheduler.tasks.killed_externally",
tags={
"dag_id": "test_process_executor_events_with_callback",
"task_id": "dummy_task",
},
)
@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
def test_process_executor_event_missing_dag(self, mock_stats_incr, mock_task_callback, dag_maker, caplog):
dag_id = "test_process_executor_events_with_callback"
task_id_1 = "dummy_task"
with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
task1 = EmptyOperator(task_id=task_id_1, on_failure_callback=lambda x: print("hi"))
ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
mock_stats_incr.reset_mock()
executor = MockExecutor(do_update=False)
task_callback = mock.MagicMock()
mock_task_callback.return_value = task_callback
scheduler_job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(scheduler_job)
self.job_runner.dagbag = mock.MagicMock()
self.job_runner.dagbag.get_dag.side_effect = Exception("failed")
self.job_runner.processor_agent = mock.MagicMock()
session = settings.Session()
ti1.state = State.QUEUED
session.merge(ti1)
session.commit()
executor.event_buffer[ti1.key] = State.FAILED, None
self.job_runner._process_executor_events(session=session)
ti1.refresh_from_db()
assert ti1.state == State.FAILED
@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
def test_process_executor_events_ti_requeued(self, mock_stats_incr, mock_task_callback, dag_maker):
dag_id = "test_process_executor_events_ti_requeued"
task_id_1 = "dummy_task"
session = settings.Session()
with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
task1 = EmptyOperator(task_id=task_id_1)
ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
mock_stats_incr.reset_mock()
executor = MockExecutor(do_update=False)
task_callback = mock.MagicMock()
mock_task_callback.return_value = task_callback
scheduler_job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(scheduler_job)
self.id = 1
self.job_runner.processor_agent = mock.MagicMock()
# ti is queued with another try number - do not fail it
ti1.state = State.QUEUED
ti1.queued_by_job_id = 1
ti1.try_number = 2
session.merge(ti1)
session.commit()
executor.event_buffer[ti1.key.with_try_number(1)] = State.SUCCESS, None
self.job_runner._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
scheduler_job.executor.callback_sink.send.assert_not_called()
# ti is queued by another scheduler - do not fail it
ti1.state = State.QUEUED
ti1.queued_by_job_id = 2
session.merge(ti1)
session.commit()
executor.event_buffer[ti1.key] = State.SUCCESS, None
self.job_runner._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
scheduler_job.executor.callback_sink.send.assert_not_called()
# ti is queued by this scheduler but it is handed back to the executor - do not fail it
ti1.state = State.QUEUED
ti1.queued_by_job_id = 1
session.merge(ti1)
session.commit()
executor.event_buffer[ti1.key] = State.SUCCESS, None
executor.has_task = mock.MagicMock(return_value=True)
self.job_runner._process_executor_events(session=session)
ti1.refresh_from_db(session=session)
assert ti1.state == State.QUEUED
scheduler_job.executor.callback_sink.send.assert_not_called()
mock_stats_incr.assert_not_called()
def test_execute_task_instances_is_paused_wont_execute(self, session, dag_maker):
dag_id = "SchedulerJobTest.test_execute_task_instances_is_paused_wont_execute"
task_id_1 = "dummy_task"
with dag_maker(dag_id=dag_id, session=session) as dag:
EmptyOperator(task_id=task_id_1)
assert isinstance(dag, SerializedDAG)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
dr1 = dag_maker.create_dagrun(run_type=DagRunType.BACKFILL_JOB)
(ti1,) = dr1.task_instances
ti1.state = State.SCHEDULED
self.job_runner._critical_section_enqueue_task_instances(session)
session.flush()
ti1.refresh_from_db(session=session)
assert State.SCHEDULED == ti1.state
session.rollback()
def test_execute_task_instances_backfill_tasks_wont_execute(self, dag_maker):
"""
Tests that backfill tasks won't get executed.
"""
dag_id = "SchedulerJobTest.test_execute_task_instances_backfill_tasks_wont_execute"
task_id_1 = "dummy_task"
with dag_maker(dag_id=dag_id):
task1 = EmptyOperator(task_id=task_id_1)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dr1 = dag_maker.create_dagrun(run_type=DagRunType.BACKFILL_JOB)
ti1 = TaskInstance(task1, run_id=dr1.run_id)
ti1.refresh_from_db()
ti1.state = State.SCHEDULED
session.merge(ti1)
session.flush()
assert dr1.is_backfill
self.job_runner._critical_section_enqueue_task_instances(session)
session.flush()
ti1.refresh_from_db()
assert State.SCHEDULED == ti1.state
session.rollback()
@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
def test_setup_callback_sink_not_standalone_dag_processor(self):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull, num_runs=1)
self.job_runner._execute()
assert isinstance(scheduler_job.executor.callback_sink, PipeCallbackSink)
@mock.patch("airflow.executors.executor_loader.ExecutorLoader.init_executors")
@mock.patch("airflow.executors.executor_loader.ExecutorLoader.get_default_executor")
@conf_vars({("scheduler", "standalone_dag_processor"): "False"})
def test_setup_callback_sink_not_standalone_dag_processor_multiple_executors(
self, get_default_executor_mock, init_executors_mock
):
default_executor = mock.MagicMock(slots_available=8)
second_executor = mock.MagicMock(slots_available=8)
init_executors_mock.return_value = [default_executor, second_executor]
get_default_executor_mock.return_value = default_executor
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull, num_runs=1)
self.job_runner._execute()
for executor in scheduler_job.executors:
assert isinstance(executor.callback_sink, PipeCallbackSink)
@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
def test_setup_callback_sink_standalone_dag_processor(self):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull, num_runs=1)
self.job_runner._execute()
assert isinstance(scheduler_job.executor.callback_sink, DatabaseCallbackSink)
@mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
@mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
def test_setup_callback_sink_standalone_dag_processor_multiple_executors(
self, executor_mock, executors_mock
):
default_executor = mock.MagicMock(slots_available=8)
second_executor = mock.MagicMock(slots_available=8)
executor_mock.return_value = default_executor
executors_mock.return_value = [default_executor, second_executor]
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull, num_runs=1)
self.job_runner._execute()
for executor in scheduler_job.executors:
assert isinstance(executor.callback_sink, DatabaseCallbackSink)
@mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
@mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
@conf_vars({("scheduler", "standalone_dag_processor"): "True"})
def test_executor_start_called(self, executor_mock, executors_mock):
default_executor = mock.MagicMock(slots_available=8)
second_executor = mock.MagicMock(slots_available=8)
executor_mock.return_value = default_executor
executors_mock.return_value = [default_executor, second_executor]
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull, num_runs=1)
self.job_runner._execute()
scheduler_job.executor.start.assert_called_once()
for executor in scheduler_job.executors:
executor.start.assert_called_once()
@mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
@mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
def test_executor_job_id_assigned(self, executor_mock, executors_mock):
default_executor = mock.MagicMock(slots_available=8)
second_executor = mock.MagicMock(slots_available=8)
executor_mock.return_value = default_executor
executors_mock.return_value = [default_executor, second_executor]
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull, num_runs=1)
self.job_runner._execute()
assert scheduler_job.executor.job_id == scheduler_job.id
for executor in scheduler_job.executors:
assert executor.job_id == scheduler_job.id
@mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
@mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
def test_executor_debug_dump(self, executor_mock, executors_mock):
default_executor = mock.MagicMock(slots_available=8)
second_executor = mock.MagicMock(slots_available=8)
executor_mock.return_value = default_executor
executors_mock.return_value = [default_executor, second_executor]
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull, num_runs=1)
self.job_runner._debug_dump(1, mock.MagicMock())
for executor in scheduler_job.executors:
executor.debug_dump.assert_called_once()
def test_find_executable_task_instances_backfill(self, dag_maker):
dag_id = "SchedulerJobTest.test_find_executable_task_instances_backfill"
task_id_1 = "dummy"
with dag_maker(dag_id=dag_id, max_active_tasks=16):
task1 = EmptyOperator(task_id=task_id_1)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.BACKFILL_JOB, state=State.RUNNING)
ti_backfill = dr2.get_task_instance(task1.task_id)
ti_with_dagrun = dr1.get_task_instance(task1.task_id)
# ti_with_paused
ti_backfill.state = State.SCHEDULED
ti_with_dagrun.state = State.SCHEDULED
session.merge(dr2)
session.merge(ti_backfill)
session.merge(ti_with_dagrun)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 1 == len(res)
res_keys = (x.key for x in res)
assert ti_with_dagrun.key in res_keys
session.rollback()
def test_find_executable_task_instances_pool(self, dag_maker):
dag_id = "SchedulerJobTest.test_find_executable_task_instances_pool"
task_id_1 = "dummy"
task_id_2 = "dummydummy"
session = settings.Session()
with dag_maker(dag_id=dag_id, max_active_tasks=16, session=session):
EmptyOperator(task_id=task_id_1, pool="a", priority_weight=2)
EmptyOperator(task_id=task_id_2, pool="b", priority_weight=1)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED)
tis = [
dr1.get_task_instance(task_id_1, session=session),
dr1.get_task_instance(task_id_2, session=session),
dr2.get_task_instance(task_id_1, session=session),
dr2.get_task_instance(task_id_2, session=session),
]
tis.sort(key=lambda ti: ti.key)
for ti in tis:
ti.state = State.SCHEDULED
session.merge(ti)
pool = Pool(pool="a", slots=1, description="haha", include_deferred=False)
pool2 = Pool(pool="b", slots=100, description="haha", include_deferred=False)
session.add(pool)
session.add(pool2)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
session.flush()
assert 3 == len(res)
res_keys = []
for ti in res:
res_keys.append(ti.key)
assert tis[0].key in res_keys
assert tis[2].key in res_keys
assert tis[3].key in res_keys
session.rollback()
@pytest.mark.parametrize(
"state, total_executed_ti",
[
(DagRunState.SUCCESS, 0),
(DagRunState.FAILED, 0),
(DagRunState.RUNNING, 2),
(DagRunState.QUEUED, 0),
],
)
def test_find_executable_task_instances_only_running_dagruns(
self, state, total_executed_ti, dag_maker, session
):
"""Test that only task instances of 'running' dagruns are executed"""
dag_id = "SchedulerJobTest.test_find_executable_task_instances_only_running_dagruns"
task_id_1 = "dummy"
task_id_2 = "dummydummy"
with dag_maker(dag_id=dag_id, session=session):
EmptyOperator(task_id=task_id_1)
EmptyOperator(task_id=task_id_2)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
dr = dag_maker.create_dagrun(state=state)
tis = dr.task_instances
for ti in tis:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
session.flush()
assert total_executed_ti == len(res)
def test_find_executable_task_instances_order_execution_date(self, dag_maker):
"""
Test that task instances follow execution_date order priority. If two dagruns with
different execution dates are scheduled, tasks with earliest dagrun execution date will first
be executed
"""
dag_id_1 = "SchedulerJobTest.test_find_executable_task_instances_order_execution_date-a"
dag_id_2 = "SchedulerJobTest.test_find_executable_task_instances_order_execution_date-b"
task_id = "task-a"
session = settings.Session()
with dag_maker(dag_id=dag_id_1, max_active_tasks=16, session=session):
EmptyOperator(task_id=task_id)
dr1 = dag_maker.create_dagrun(execution_date=DEFAULT_DATE + timedelta(hours=1))
with dag_maker(dag_id=dag_id_2, max_active_tasks=16, session=session):
EmptyOperator(task_id=task_id)
dr2 = dag_maker.create_dagrun()
dr1 = session.merge(dr1, load=False)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
tis = dr1.task_instances + dr2.task_instances
for ti in tis:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=1, session=session)
session.flush()
assert [ti.key for ti in res] == [tis[1].key]
session.rollback()
def test_find_executable_task_instances_order_priority(self, dag_maker):
dag_id_1 = "SchedulerJobTest.test_find_executable_task_instances_order_priority-a"
dag_id_2 = "SchedulerJobTest.test_find_executable_task_instances_order_priority-b"
task_id = "task-a"
session = settings.Session()
with dag_maker(dag_id=dag_id_1, max_active_tasks=16, session=session):
EmptyOperator(task_id=task_id, priority_weight=1)
dr1 = dag_maker.create_dagrun()
with dag_maker(dag_id=dag_id_2, max_active_tasks=16, session=session):
EmptyOperator(task_id=task_id, priority_weight=4)
dr2 = dag_maker.create_dagrun()
dr1 = session.merge(dr1, load=False)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
tis = dr1.task_instances + dr2.task_instances
for ti in tis:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=1, session=session)
session.flush()
assert [ti.key for ti in res] == [tis[1].key]
session.rollback()
def test_find_executable_task_instances_order_priority_with_pools(self, dag_maker):
"""
The scheduler job should pick tasks with higher priority for execution
even if different pools are involved.
"""
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dag_id = "SchedulerJobTest.test_find_executable_task_instances_order_priority_with_pools"
session.add(Pool(pool="pool1", slots=32, include_deferred=False))
session.add(Pool(pool="pool2", slots=32, include_deferred=False))
with dag_maker(dag_id=dag_id, max_active_tasks=2):
op1 = EmptyOperator(task_id="dummy1", priority_weight=1, pool="pool1")
op2 = EmptyOperator(task_id="dummy2", priority_weight=2, pool="pool2")
op3 = EmptyOperator(task_id="dummy3", priority_weight=3, pool="pool1")
dag_run = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
ti1 = dag_run.get_task_instance(op1.task_id, session)
ti2 = dag_run.get_task_instance(op2.task_id, session)
ti3 = dag_run.get_task_instance(op3.task_id, session)
ti1.state = State.SCHEDULED
ti2.state = State.SCHEDULED
ti3.state = State.SCHEDULED
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 2 == len(res)
assert ti3.key == res[0].key
assert ti2.key == res[1].key
session.rollback()
def test_find_executable_task_instances_order_execution_date_and_priority(self, dag_maker):
dag_id_1 = "SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-a"
dag_id_2 = "SchedulerJobTest.test_find_executable_task_instances_order_execution_date_and_priority-b"
task_id = "task-a"
session = settings.Session()
with dag_maker(dag_id=dag_id_1, max_active_tasks=16, session=session):
EmptyOperator(task_id=task_id, priority_weight=1)
dr1 = dag_maker.create_dagrun()
with dag_maker(dag_id=dag_id_2, max_active_tasks=16, session=session):
EmptyOperator(task_id=task_id, priority_weight=4)
dr2 = dag_maker.create_dagrun(execution_date=DEFAULT_DATE + timedelta(hours=1))
dr1 = session.merge(dr1, load=False)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
tis = dr1.task_instances + dr2.task_instances
for ti in tis:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=1, session=session)
session.flush()
assert [ti.key for ti in res] == [tis[1].key]
session.rollback()
def test_find_executable_task_instances_in_default_pool(self, dag_maker):
set_default_pool_slots(1)
dag_id = "SchedulerJobTest.test_find_executable_task_instances_in_default_pool"
with dag_maker(dag_id=dag_id):
op1 = EmptyOperator(task_id="dummy1")
op2 = EmptyOperator(task_id="dummy2")
scheduler_job = Job(executor=MockExecutor())
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull, num_runs=1)
session = settings.Session()
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED, state=State.RUNNING)
ti1 = dr1.get_task_instance(op1.task_id, session)
ti2 = dr2.get_task_instance(op2.task_id, session)
ti1.state = State.SCHEDULED
ti2.state = State.SCHEDULED
session.flush()
# Two tasks w/o pool up for execution and our default pool size is 1
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 1 == len(res)
ti2.state = State.RUNNING
session.flush()
# One task w/o pool up for execution and one task running
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 0 == len(res)
session.rollback()
session.close()
def test_queued_task_instances_fails_with_missing_dag(self, dag_maker, session):
"""Check that task instances of missing DAGs are failed"""
dag_id = "SchedulerJobTest.test_find_executable_task_instances_not_in_dagbag"
task_id_1 = "dummy"
task_id_2 = "dummydummy"
with dag_maker(dag_id=dag_id, session=session, default_args={"max_active_tis_per_dag": 1}):
EmptyOperator(task_id=task_id_1)
EmptyOperator(task_id=task_id_2)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.dagbag = mock.MagicMock()
self.job_runner.dagbag.get_dag.return_value = None
dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
tis = dr.task_instances
for ti in tis:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
session.flush()
assert 0 == len(res)
tis = dr.get_task_instances(session=session)
assert len(tis) == 2
assert all(ti.state == State.FAILED for ti in tis)
def test_nonexistent_pool(self, dag_maker):
dag_id = "SchedulerJobTest.test_nonexistent_pool"
with dag_maker(dag_id=dag_id, max_active_tasks=16):
EmptyOperator(task_id="dummy_wrong_pool", pool="this_pool_doesnt_exist")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dr = dag_maker.create_dagrun()
ti = dr.task_instances[0]
ti.state = State.SCHEDULED
session.merge(ti)
session.commit()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
session.flush()
assert 0 == len(res)
session.rollback()
def test_infinite_pool(self, dag_maker):
dag_id = "SchedulerJobTest.test_infinite_pool"
with dag_maker(dag_id=dag_id, max_active_tasks=16):
EmptyOperator(task_id="dummy", pool="infinite_pool")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dr = dag_maker.create_dagrun()
ti = dr.task_instances[0]
ti.state = State.SCHEDULED
session.merge(ti)
infinite_pool = Pool(
pool="infinite_pool",
slots=-1,
description="infinite pool",
include_deferred=False,
)
session.add(infinite_pool)
session.commit()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
session.flush()
assert 1 == len(res)
session.rollback()
def test_not_enough_pool_slots(self, caplog, dag_maker):
dag_id = "SchedulerJobTest.test_test_not_enough_pool_slots"
with dag_maker(dag_id=dag_id, max_active_tasks=16):
EmptyOperator(task_id="cannot_run", pool="some_pool", pool_slots=4)
EmptyOperator(task_id="can_run", pool="some_pool", pool_slots=1)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dr = dag_maker.create_dagrun()
ti = dr.task_instances[0]
ti.state = State.SCHEDULED
session.merge(ti)
ti = dr.task_instances[1]
ti.state = State.SCHEDULED
session.merge(ti)
some_pool = Pool(pool="some_pool", slots=2, description="my pool", include_deferred=False)
session.add(some_pool)
session.commit()
with caplog.at_level(logging.WARNING):
self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert (
"Not executing <TaskInstance: "
"SchedulerJobTest.test_test_not_enough_pool_slots.cannot_run test [scheduled]>. "
"Requested pool slots (4) are greater than total pool slots: '2' for pool: some_pool"
in caplog.text
)
assert (
session.query(TaskInstance)
.filter(TaskInstance.dag_id == dag_id, TaskInstance.state == State.SCHEDULED)
.count()
== 1
)
assert (
session.query(TaskInstance)
.filter(TaskInstance.dag_id == dag_id, TaskInstance.state == State.QUEUED)
.count()
== 1
)
session.flush()
session.rollback()
def test_find_executable_task_instances_none(self, dag_maker):
dag_id = "SchedulerJobTest.test_find_executable_task_instances_none"
task_id_1 = "dummy"
with dag_maker(dag_id=dag_id, max_active_tasks=16):
EmptyOperator(task_id=task_id_1)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
assert 0 == len(self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session))
session.rollback()
def test_tis_for_queued_dagruns_are_not_run(self, dag_maker):
"""
This tests that tis from queued dagruns are not queued
"""
dag_id = "test_tis_for_queued_dagruns_are_not_run"
task_id_1 = "dummy"
with dag_maker(dag_id):
task1 = EmptyOperator(task_id=task_id_1)
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.QUEUED)
dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
ti1 = TaskInstance(task1, run_id=dr1.run_id)
ti2 = TaskInstance(task1, run_id=dr2.run_id)
ti1.state = State.SCHEDULED
ti2.state = State.SCHEDULED
session.merge(ti1)
session.merge(ti2)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 1 == len(res)
assert ti2.key == res[0].key
ti1.refresh_from_db()
ti2.refresh_from_db()
assert ti1.state == State.SCHEDULED
assert ti2.state == State.QUEUED
def test_find_executable_task_instances_concurrency(self, dag_maker):
dag_id = "SchedulerJobTest.test_find_executable_task_instances_concurrency"
session = settings.Session()
with dag_maker(dag_id=dag_id, max_active_tasks=2, session=session):
EmptyOperator(task_id="dummy")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED)
dr3 = dag_maker.create_dagrun_after(dr2, run_type=DagRunType.SCHEDULED)
ti1 = dr1.task_instances[0]
ti2 = dr2.task_instances[0]
ti3 = dr3.task_instances[0]
ti1.state = State.RUNNING
ti2.state = State.SCHEDULED
ti3.state = State.SCHEDULED
session.merge(ti1)
session.merge(ti2)
session.merge(ti3)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 1 == len(res)
res_keys = (x.key for x in res)
assert ti2.key in res_keys
ti2.state = State.RUNNING
session.merge(ti2)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 0 == len(res)
session.rollback()
def test_find_executable_task_instances_concurrency_queued(self, dag_maker):
dag_id = "SchedulerJobTest.test_find_executable_task_instances_concurrency_queued"
with dag_maker(dag_id=dag_id, max_active_tasks=3):
task1 = EmptyOperator(task_id="dummy1")
task2 = EmptyOperator(task_id="dummy2")
task3 = EmptyOperator(task_id="dummy3")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dag_run = dag_maker.create_dagrun()
ti1 = dag_run.get_task_instance(task1.task_id)
ti2 = dag_run.get_task_instance(task2.task_id)
ti3 = dag_run.get_task_instance(task3.task_id)
ti1.state = State.RUNNING
ti2.state = State.QUEUED
ti3.state = State.SCHEDULED
session.merge(ti1)
session.merge(ti2)
session.merge(ti3)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 1 == len(res)
assert res[0].key == ti3.key
session.rollback()
# TODO: This is a hack, I think I need to just remove the setting and have it on always
def test_find_executable_task_instances_max_active_tis_per_dag(self, dag_maker):
dag_id = "SchedulerJobTest.test_find_executable_task_instances_max_active_tis_per_dag"
task_id_1 = "dummy"
task_id_2 = "dummy2"
with dag_maker(dag_id=dag_id, max_active_tasks=16):
task1 = EmptyOperator(task_id=task_id_1, max_active_tis_per_dag=2)
task2 = EmptyOperator(task_id=task_id_2)
executor = MockExecutor(do_update=True)
scheduler_job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
session = settings.Session()
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED)
dr3 = dag_maker.create_dagrun_after(dr2, run_type=DagRunType.SCHEDULED)
ti1_1 = dr1.get_task_instance(task1.task_id)
ti2 = dr1.get_task_instance(task2.task_id)
ti1_1.state = State.SCHEDULED
ti2.state = State.SCHEDULED
session.merge(ti1_1)
session.merge(ti2)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 2 == len(res)
ti1_1.state = State.RUNNING
ti2.state = State.RUNNING
ti1_2 = dr2.get_task_instance(task1.task_id)
ti1_2.state = State.SCHEDULED
session.merge(ti1_1)
session.merge(ti2)
session.merge(ti1_2)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 1 == len(res)
ti1_2.state = State.RUNNING
ti1_3 = dr3.get_task_instance(task1.task_id)
ti1_3.state = State.SCHEDULED
session.merge(ti1_2)
session.merge(ti1_3)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 0 == len(res)
ti1_1.state = State.SCHEDULED
ti1_2.state = State.SCHEDULED
ti1_3.state = State.SCHEDULED
session.merge(ti1_1)
session.merge(ti1_2)
session.merge(ti1_3)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 2 == len(res)
ti1_1.state = State.RUNNING
ti1_2.state = State.SCHEDULED
ti1_3.state = State.SCHEDULED
session.merge(ti1_1)
session.merge(ti1_2)
session.merge(ti1_3)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 1 == len(res)
session.rollback()
def test_change_state_for_executable_task_instances_no_tis_with_state(self, dag_maker):
dag_id = "SchedulerJobTest.test_change_state_for__no_tis_with_state"
task_id_1 = "dummy"
with dag_maker(dag_id=dag_id, max_active_tasks=2):
task1 = EmptyOperator(task_id=task_id_1)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED)
dr3 = dag_maker.create_dagrun_after(dr2, run_type=DagRunType.SCHEDULED)
ti1 = dr1.get_task_instance(task1.task_id)
ti2 = dr2.get_task_instance(task1.task_id)
ti3 = dr3.get_task_instance(task1.task_id)
ti1.state = State.RUNNING
ti2.state = State.RUNNING
ti3.state = State.RUNNING
session.merge(ti1)
session.merge(ti2)
session.merge(ti3)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=100, session=session)
assert 0 == len(res)
session.rollback()
def test_find_executable_task_instances_not_enough_pool_slots_for_first(self, dag_maker):
set_default_pool_slots(1)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dag_id = "SchedulerJobTest.test_find_executable_task_instances_not_enough_pool_slots_for_first"
with dag_maker(dag_id=dag_id):
op1 = EmptyOperator(task_id="dummy1", priority_weight=2, pool_slots=2)
op2 = EmptyOperator(task_id="dummy2", priority_weight=1, pool_slots=1)
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
ti1 = dr1.get_task_instance(op1.task_id, session)
ti2 = dr1.get_task_instance(op2.task_id, session)
ti1.state = State.SCHEDULED
ti2.state = State.SCHEDULED
session.flush()
# Schedule ti with lower priority,
# because the one with higher priority is limited by a concurrency limit
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 1 == len(res)
assert res[0].key == ti2.key
session.rollback()
def test_find_executable_task_instances_not_enough_dag_concurrency_for_first(self, dag_maker):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dag_id_1 = (
"SchedulerJobTest.test_find_executable_task_instances_not_enough_dag_concurrency_for_first-a"
)
dag_id_2 = (
"SchedulerJobTest.test_find_executable_task_instances_not_enough_dag_concurrency_for_first-b"
)
with dag_maker(dag_id=dag_id_1, max_active_tasks=1):
op1a = EmptyOperator(task_id="dummy1-a", priority_weight=2)
op1b = EmptyOperator(task_id="dummy1-b", priority_weight=2)
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
with dag_maker(dag_id=dag_id_2):
op2 = EmptyOperator(task_id="dummy2", priority_weight=1)
dr2 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
ti1a = dr1.get_task_instance(op1a.task_id, session)
ti1b = dr1.get_task_instance(op1b.task_id, session)
ti2 = dr2.get_task_instance(op2.task_id, session)
ti1a.state = State.RUNNING
ti1b.state = State.SCHEDULED
ti2.state = State.SCHEDULED
session.flush()
# Schedule ti with lower priority,
# because the one with higher priority is limited by a concurrency limit
res = self.job_runner._executable_task_instances_to_queued(max_tis=1, session=session)
assert 1 == len(res)
assert res[0].key == ti2.key
session.rollback()
def test_find_executable_task_instances_not_enough_task_concurrency_for_first(self, dag_maker):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dag_id = "SchedulerJobTest.test_find_executable_task_instances_not_enough_task_concurrency_for_first"
with dag_maker(dag_id=dag_id):
op1a = EmptyOperator(task_id="dummy1-a", priority_weight=2, max_active_tis_per_dag=1)
op1b = EmptyOperator(task_id="dummy1-b", priority_weight=1)
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED)
ti1a = dr1.get_task_instance(op1a.task_id, session)
ti1b = dr1.get_task_instance(op1b.task_id, session)
ti2a = dr2.get_task_instance(op1a.task_id, session)
ti1a.state = State.RUNNING
ti1b.state = State.SCHEDULED
ti2a.state = State.SCHEDULED
session.flush()
# Schedule ti with lower priority,
# because the one with higher priority is limited by a concurrency limit
res = self.job_runner._executable_task_instances_to_queued(max_tis=1, session=session)
assert 1 == len(res)
assert res[0].key == ti1b.key
session.rollback()
def test_find_executable_task_instances_task_concurrency_per_dagrun_for_first(self, dag_maker):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dag_id = "SchedulerJobTest.test_find_executable_task_instances_task_concurrency_per_dagrun_for_first"
with dag_maker(dag_id=dag_id):
op1a = EmptyOperator(task_id="dummy1-a", priority_weight=2, max_active_tis_per_dagrun=1)
op1b = EmptyOperator(task_id="dummy1-b", priority_weight=1)
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED)
ti1a = dr1.get_task_instance(op1a.task_id, session)
ti1b = dr1.get_task_instance(op1b.task_id, session)
ti2a = dr2.get_task_instance(op1a.task_id, session)
ti1a.state = State.RUNNING
ti1b.state = State.SCHEDULED
ti2a.state = State.SCHEDULED
session.flush()
# Schedule ti with higher priority,
# because it's running in a different DAG run with 0 active tis
res = self.job_runner._executable_task_instances_to_queued(max_tis=1, session=session)
assert 1 == len(res)
assert res[0].key == ti2a.key
session.rollback()
def test_find_executable_task_instances_not_enough_task_concurrency_per_dagrun_for_first(self, dag_maker):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dag_id = (
"SchedulerJobTest"
".test_find_executable_task_instances_not_enough_task_concurrency_per_dagrun_for_first"
)
with dag_maker(dag_id=dag_id):
op1a = EmptyOperator.partial(
task_id="dummy1-a", priority_weight=2, max_active_tis_per_dagrun=1
).expand_kwargs([{"inputs": 1}, {"inputs": 2}])
op1b = EmptyOperator(task_id="dummy1-b", priority_weight=1)
dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
ti1a0 = dr.get_task_instance(op1a.task_id, session, map_index=0)
ti1a1 = dr.get_task_instance(op1a.task_id, session, map_index=1)
ti1b = dr.get_task_instance(op1b.task_id, session)
ti1a0.state = State.RUNNING
ti1a1.state = State.SCHEDULED
ti1b.state = State.SCHEDULED
session.flush()
# Schedule ti with lower priority,
# because the one with higher priority is limited by a concurrency limit
res = self.job_runner._executable_task_instances_to_queued(max_tis=1, session=session)
assert 1 == len(res)
assert res[0].key == ti1b.key
session.rollback()
def test_find_executable_task_instances_negative_open_pool_slots(self, dag_maker):
"""
Pools with negative open slots should not block other pools.
Negative open slots can happen when reducing the number of total slots in a pool
while tasks are running in that pool.
"""
set_default_pool_slots(0)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
pool1 = Pool(pool="pool1", slots=1, include_deferred=False)
pool2 = Pool(pool="pool2", slots=1, include_deferred=False)
session.add(pool1)
session.add(pool2)
dag_id = "SchedulerJobTest.test_find_executable_task_instances_negative_open_pool_slots"
with dag_maker(dag_id=dag_id):
op1 = EmptyOperator(task_id="op1", pool="pool1")
op2 = EmptyOperator(task_id="op2", pool="pool2", pool_slots=2)
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
ti1 = dr1.get_task_instance(op1.task_id, session)
ti2 = dr1.get_task_instance(op2.task_id, session)
ti1.state = State.SCHEDULED
ti2.state = State.RUNNING
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=1, session=session)
assert 1 == len(res)
assert res[0].key == ti1.key
session.rollback()
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.gauge")
def test_emit_pool_starving_tasks_metrics(self, mock_stats_gauge, dag_maker):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dag_id = "SchedulerJobTest.test_emit_pool_starving_tasks_metrics"
with dag_maker(dag_id=dag_id):
op = EmptyOperator(task_id="op", pool_slots=2)
dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
ti = dr.get_task_instance(op.task_id, session)
ti.state = State.SCHEDULED
set_default_pool_slots(1)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 0 == len(res)
mock_stats_gauge.assert_has_calls(
[
mock.call("scheduler.tasks.starving", 1),
mock.call(f"pool.starving_tasks.{Pool.DEFAULT_POOL_NAME}", 1),
mock.call("pool.starving_tasks", 1, tags={"pool_name": Pool.DEFAULT_POOL_NAME}),
],
any_order=True,
)
mock_stats_gauge.reset_mock()
set_default_pool_slots(2)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert 1 == len(res)
mock_stats_gauge.assert_has_calls(
[
mock.call("scheduler.tasks.starving", 0),
mock.call(f"pool.starving_tasks.{Pool.DEFAULT_POOL_NAME}", 0),
mock.call("pool.starving_tasks", 0, tags={"pool_name": Pool.DEFAULT_POOL_NAME}),
],
any_order=True,
)
session.rollback()
session.close()
def test_enqueue_task_instances_with_queued_state(self, dag_maker, session):
dag_id = "SchedulerJobTest.test_enqueue_task_instances_with_queued_state"
task_id_1 = "dummy"
session = settings.Session()
with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE, session=session):
task1 = EmptyOperator(task_id=task_id_1)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
dr1 = dag_maker.create_dagrun()
ti1 = dr1.get_task_instance(task1.task_id, session)
with patch.object(BaseExecutor, "queue_command") as mock_queue_command:
self.job_runner._enqueue_task_instances_with_queued_state([ti1], session=session)
assert mock_queue_command.called
session.rollback()
@pytest.mark.parametrize("state", [State.FAILED, State.SUCCESS])
def test_enqueue_task_instances_sets_ti_state_to_None_if_dagrun_in_finish_state(self, state, dag_maker):
"""This tests that task instances whose dagrun is in finished state are not queued"""
dag_id = "SchedulerJobTest.test_enqueue_task_instances_with_queued_state"
task_id_1 = "dummy"
session = settings.Session()
with dag_maker(dag_id=dag_id, start_date=DEFAULT_DATE, session=session):
task1 = EmptyOperator(task_id=task_id_1)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
dr1 = dag_maker.create_dagrun(state=state)
ti = dr1.get_task_instance(task1.task_id, session)
ti.state = State.SCHEDULED
session.merge(ti)
session.commit()
with patch.object(BaseExecutor, "queue_command") as mock_queue_command:
self.job_runner._enqueue_task_instances_with_queued_state([ti], session=session)
session.flush()
ti.refresh_from_db(session=session)
assert ti.state == State.NONE
mock_queue_command.assert_not_called()
def test_critical_section_enqueue_task_instances(self, dag_maker):
dag_id = "SchedulerJobTest.test_execute_task_instances"
task_id_1 = "dummy_task"
task_id_2 = "dummy_task_nonexistent_queue"
session = settings.Session()
# important that len(tasks) is less than max_active_tasks
# because before scheduler._execute_task_instances would only
# check the num tasks once so if max_active_tasks was 3,
# we could execute arbitrarily many tasks in the second run
with dag_maker(dag_id=dag_id, max_active_tasks=3, session=session) as dag:
task1 = EmptyOperator(task_id=task_id_1)
task2 = EmptyOperator(task_id=task_id_2)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
# create first dag run with 1 running and 1 queued
dr1 = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
ti1 = dr1.get_task_instance(task1.task_id, session)
ti2 = dr1.get_task_instance(task2.task_id, session)
ti1.state = State.RUNNING
ti2.state = State.RUNNING
session.flush()
assert State.RUNNING == dr1.state
assert 2 == DAG.get_num_task_instances(
dag_id, task_ids=dag.task_ids, states=[State.RUNNING], session=session
)
# create second dag run
dr2 = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED)
ti3 = dr2.get_task_instance(task1.task_id, session)
ti4 = dr2.get_task_instance(task2.task_id, session)
# manually set to scheduled so we can pick them up
ti3.state = State.SCHEDULED
ti4.state = State.SCHEDULED
session.flush()
assert State.RUNNING == dr2.state
res = self.job_runner._critical_section_enqueue_task_instances(session)
# check that max_active_tasks is respected
ti1.refresh_from_db()
ti2.refresh_from_db()
ti3.refresh_from_db()
ti4.refresh_from_db()
assert 3 == DAG.get_num_task_instances(
dag_id, task_ids=dag.task_ids, states=[State.RUNNING, State.QUEUED], session=session
)
assert State.RUNNING == ti1.state
assert State.RUNNING == ti2.state
assert {State.QUEUED, State.SCHEDULED} == {ti3.state, ti4.state}
assert 1 == res
def test_execute_task_instances_limit(self, dag_maker):
dag_id = "SchedulerJobTest.test_execute_task_instances_limit"
task_id_1 = "dummy_task"
task_id_2 = "dummy_task_2"
session = settings.Session()
# important that len(tasks) is less than max_active_tasks
# because before scheduler._execute_task_instances would only
# check the num tasks once so if max_active_tasks was 3,
# we could execute arbitrarily many tasks in the second run
with dag_maker(dag_id=dag_id, max_active_tasks=16, session=session):
task1 = EmptyOperator(task_id=task_id_1)
task2 = EmptyOperator(task_id=task_id_2)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
def _create_dagruns():
dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.RUNNING)
yield dagrun
for _ in range(3):
dagrun = dag_maker.create_dagrun_after(
dagrun,
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
)
yield dagrun
tis = []
for dr in _create_dagruns():
ti1 = dr.get_task_instance(task1.task_id, session)
ti2 = dr.get_task_instance(task2.task_id, session)
ti1.state = State.SCHEDULED
ti2.state = State.SCHEDULED
session.flush()
scheduler_job.max_tis_per_query = 2
res = self.job_runner._critical_section_enqueue_task_instances(session)
assert 2 == res
scheduler_job.max_tis_per_query = 8
with mock.patch.object(
type(scheduler_job.executor), "slots_available", new_callable=mock.PropertyMock
) as mock_slots:
mock_slots.return_value = 2
# Check that we don't "overfill" the executor
assert 2 == res
res = self.job_runner._critical_section_enqueue_task_instances(session)
res = self.job_runner._critical_section_enqueue_task_instances(session)
assert 4 == res
for ti in tis:
ti.refresh_from_db()
assert State.QUEUED == ti.state
def test_execute_task_instances_unlimited(self, dag_maker):
"""Test that max_tis_per_query=0 is unlimited"""
dag_id = "SchedulerJobTest.test_execute_task_instances_unlimited"
task_id_1 = "dummy_task"
task_id_2 = "dummy_task_2"
session = settings.Session()
with dag_maker(dag_id=dag_id, max_active_tasks=1024, session=session):
task1 = EmptyOperator(task_id=task_id_1)
task2 = EmptyOperator(task_id=task_id_2)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
def _create_dagruns():
dagrun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.RUNNING)
yield dagrun
for _ in range(19):
dagrun = dag_maker.create_dagrun_after(
dagrun,
run_type=DagRunType.SCHEDULED,
state=State.RUNNING,
)
yield dagrun
for dr in _create_dagruns():
ti1 = dr.get_task_instance(task1.task_id, session)
ti2 = dr.get_task_instance(task2.task_id, session)
ti1.state = State.SCHEDULED
ti2.state = State.SCHEDULED
session.flush()
scheduler_job.max_tis_per_query = 0
scheduler_job.executor = MagicMock(slots_available=36)
res = self.job_runner._critical_section_enqueue_task_instances(session)
# 20 dag runs * 2 tasks each = 40, but limited by number of slots available
assert res == 36
session.rollback()
def test_adopt_or_reset_orphaned_tasks(self, dag_maker):
session = settings.Session()
with dag_maker("test_execute_helper_reset_orphaned_tasks") as dag:
op1 = EmptyOperator(task_id="op1")
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dr = dag_maker.create_dagrun()
dr2 = dag.create_dagrun(
run_type=DagRunType.BACKFILL_JOB,
state=State.RUNNING,
execution_date=DEFAULT_DATE + datetime.timedelta(1),
start_date=DEFAULT_DATE,
session=session,
data_interval=data_interval,
)
scheduler_job = Job()
session.add(scheduler_job)
session.commit()
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
ti.state = State.QUEUED
ti.queued_by_job_id = scheduler_job.id
ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
ti2.state = State.QUEUED
ti2.queued_by_job_id = scheduler_job.id
session.commit()
processor = mock.MagicMock()
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
self.job_runner.processor_agent = processor
self.job_runner.adopt_or_reset_orphaned_tasks()
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
assert ti.state == State.NONE
ti2 = dr2.get_task_instance(task_id=op1.task_id, session=session)
assert ti2.state == State.QUEUED, "Tasks run by Backfill Jobs should not be reset"
def test_fail_stuck_queued_tasks(self, dag_maker, session):
with dag_maker("test_fail_stuck_queued_tasks"):
op1 = EmptyOperator(task_id="op1")
dr = dag_maker.create_dagrun()
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
ti.state = State.QUEUED
ti.queued_dttm = timezone.utcnow() - timedelta(minutes=15)
session.commit()
executor = MagicMock()
executor.cleanup_stuck_queued_tasks = mock.MagicMock()
scheduler_job = Job(executor=executor)
job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
job_runner._task_queued_timeout = 300
job_runner._fail_tasks_stuck_in_queued()
job_runner.job.executor.cleanup_stuck_queued_tasks.assert_called_once()
def test_fail_stuck_queued_tasks_raises_not_implemented(self, dag_maker, session, caplog):
with dag_maker("test_fail_stuck_queued_tasks"):
op1 = EmptyOperator(task_id="op1")
dr = dag_maker.create_dagrun()
ti = dr.get_task_instance(task_id=op1.task_id, session=session)
ti.state = State.QUEUED
ti.queued_dttm = timezone.utcnow() - timedelta(minutes=15)
session.commit()
from airflow.executors.local_executor import LocalExecutor
scheduler_job = Job(executor=LocalExecutor())
job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=0)
job_runner._task_queued_timeout = 300
with caplog.at_level(logging.DEBUG):
job_runner._fail_tasks_stuck_in_queued()
assert "Executor doesn't support cleanup of stuck queued tasks. Skipping." in caplog.text
@mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
def test_executor_end_called(self, mock_processor_agent):
"""
Test to make sure executor.end gets called with a successful scheduler loop run
"""
scheduler_job = Job(executor=mock.MagicMock(slots_available=8))
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull, num_runs=1)
run_job(scheduler_job, execute_callable=self.job_runner._execute)
scheduler_job.executor.end.assert_called_once()
self.job_runner.processor_agent.end.assert_called_once()
@mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
@mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
@mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
def test_executor_end_called_multiple_executors(
self, mock_processor_agent, executor_mock, executors_mock
):
"""
Test to make sure executor.end gets called on all executors with a successful scheduler loop run
"""
default_executor = mock.MagicMock(slots_available=8)
second_executor = mock.MagicMock(slots_available=8)
executor_mock.return_value = default_executor
executors_mock.return_value = [default_executor, second_executor]
scheduler_job = Job()
assert scheduler_job.executor is default_executor
assert scheduler_job.executors == [default_executor, second_executor]
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull, num_runs=1)
run_job(scheduler_job, execute_callable=self.job_runner._execute)
scheduler_job.executor.end.assert_called_once()
for executor in scheduler_job.executors:
executor.end.assert_called_once()
self.job_runner.processor_agent.end.assert_called_once()
@mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
def test_cleanup_methods_all_called(self, mock_processor_agent):
"""
Test to make sure all cleanup methods are called when the scheduler loop has an exception
"""
scheduler_job = Job(executor=mock.MagicMock(slots_available=8))
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull, num_runs=1)
self.job_runner._run_scheduler_loop = mock.MagicMock(side_effect=RuntimeError("oops"))
mock_processor_agent.return_value.end.side_effect = RuntimeError("double oops")
scheduler_job.executor.end = mock.MagicMock(side_effect=RuntimeError("triple oops"))
with pytest.raises(RuntimeError, match="oops"):
run_job(scheduler_job, execute_callable=self.job_runner._execute)
self.job_runner.processor_agent.end.assert_called_once()
scheduler_job.executor.end.assert_called_once()
mock_processor_agent.return_value.end.reset_mock(side_effect=True)
@mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock)
@mock.patch("airflow.jobs.job.Job.executor", new_callable=PropertyMock)
@mock.patch("airflow.dag_processing.manager.DagFileProcessorAgent")
def test_cleanup_methods_all_called_multiple_executors(
self, mock_processor_agent, executor_mock, executors_mock
):
"""
Test to make sure all cleanup methods are called when the scheduler loop has an exception
"""
default_executor = mock.MagicMock(slots_available=8)
second_executor = mock.MagicMock(slots_available=8)
executor_mock.return_value = default_executor
executors_mock.return_value = [default_executor, second_executor]
scheduler_job = Job()
assert scheduler_job.executor is default_executor
assert scheduler_job.executors == [default_executor, second_executor]
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull, num_runs=1)
self.job_runner._run_scheduler_loop = mock.MagicMock(side_effect=RuntimeError("oops"))
mock_processor_agent.return_value.end.side_effect = RuntimeError("double oops")
scheduler_job.executor.end = mock.MagicMock(side_effect=RuntimeError("triple oops"))
with pytest.raises(RuntimeError, match="oops"):
run_job(scheduler_job, execute_callable=self.job_runner._execute)
self.job_runner.processor_agent.end.assert_called_once()
for executor in scheduler_job.executors:
executor.end.assert_called_once()
mock_processor_agent.return_value.end.reset_mock(side_effect=True)
def test_queued_dagruns_stops_creating_when_max_active_is_reached(self, dag_maker):
"""This tests that queued dagruns stops creating once max_active_runs is reached"""
with dag_maker(max_active_runs=10) as dag:
EmptyOperator(task_id="mytask")
session = settings.Session()
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner.processor_agent = mock.MagicMock()
self.job_runner.dagbag = dag_maker.dagbag
session = settings.Session()
orm_dag = session.get(DagModel, dag.dag_id)
assert orm_dag is not None
for _ in range(20):
self.job_runner._create_dag_runs([orm_dag], session)
drs = session.query(DagRun).all()
assert len(drs) == 10
for dr in drs:
dr.state = State.RUNNING
session.merge(dr)
session.commit()
assert session.query(DagRun.state).filter(DagRun.state == State.RUNNING).count() == 10
for _ in range(20):
self.job_runner._create_dag_runs([orm_dag], session)
assert session.query(DagRun).count() == 10
assert session.query(DagRun.state).filter(DagRun.state == State.RUNNING).count() == 10
assert session.query(DagRun.state).filter(DagRun.state == State.QUEUED).count() == 0
assert orm_dag.next_dagrun_create_after is None
def test_runs_are_created_after_max_active_runs_was_reached(self, dag_maker, session):
"""
Test that when creating runs once max_active_runs is reached the runs does not stick
"""
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor(do_update=True)
self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
with dag_maker(max_active_runs=1, session=session) as dag:
# Need to use something that doesn't immediately get marked as success by the scheduler
BashOperator(task_id="task", bash_command="true")
dag_run = dag_maker.create_dagrun(state=State.RUNNING, session=session, run_type=DagRunType.SCHEDULED)
# Reach max_active_runs
for _ in range(3):
self.job_runner._do_scheduling(session)
# Complete dagrun
# Add dag_run back in to the session (_do_scheduling does an expunge_all)
dag_run = session.merge(dag_run)
session.refresh(dag_run)
dag_run.get_task_instance(task_id="task", session=session).state = State.SUCCESS
# create new run
for _ in range(3):
self.job_runner._do_scheduling(session)
# Assert that new runs has created
dag_runs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(dag_runs) == 2
def test_dagrun_timeout_verify_max_active_runs(self, dag_maker):
"""
Test if a a dagrun will not be scheduled if max_dag_runs
has been reached and dagrun_timeout is not reached
Test if a a dagrun would be scheduled if max_dag_runs has
been reached but dagrun_timeout is also reached
"""
with dag_maker(
dag_id="test_scheduler_verify_max_active_runs_and_dagrun_timeout",
start_date=DEFAULT_DATE,
max_active_runs=1,
processor_subdir=TEST_DAG_FOLDER,
dagrun_timeout=datetime.timedelta(seconds=60),
) as dag:
EmptyOperator(task_id="dummy")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.dagbag = dag_maker.dagbag
session = settings.Session()
orm_dag = session.get(DagModel, dag.dag_id)
assert orm_dag is not None
self.job_runner._create_dag_runs([orm_dag], session)
self.job_runner._start_queued_dagruns(session)
drs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(drs) == 1
dr = drs[0]
assert orm_dag.next_dagrun_create_after is None
# But we should record the date of _what run_ it would be
assert isinstance(orm_dag.next_dagrun, datetime.datetime)
assert isinstance(orm_dag.next_dagrun_data_interval_start, datetime.datetime)
assert isinstance(orm_dag.next_dagrun_data_interval_end, datetime.datetime)
# Should be scheduled as dagrun_timeout has passed
dr.start_date = timezone.utcnow() - datetime.timedelta(days=1)
session.flush()
# Mock that processor_agent is started
self.job_runner.processor_agent = mock.Mock()
callback = self.job_runner._schedule_dag_run(dr, session)
session.flush()
session.refresh(dr)
assert dr.state == State.FAILED
session.refresh(orm_dag)
assert isinstance(orm_dag.next_dagrun, datetime.datetime)
assert isinstance(orm_dag.next_dagrun_data_interval_start, datetime.datetime)
assert isinstance(orm_dag.next_dagrun_data_interval_end, datetime.datetime)
assert isinstance(orm_dag.next_dagrun_create_after, datetime.datetime)
expected_callback = DagCallbackRequest(
full_filepath=dr.dag.fileloc,
dag_id=dr.dag_id,
is_failure_callback=True,
run_id=dr.run_id,
processor_subdir=TEST_DAG_FOLDER,
msg="timed_out",
)
# Verify dag failure callback request is sent
assert callback == expected_callback
session.rollback()
session.close()
def test_dagrun_timeout_fails_run(self, dag_maker):
"""
Test if a a dagrun will be set failed if timeout, even without max_active_runs
"""
session = settings.Session()
with dag_maker(
dag_id="test_scheduler_fail_dagrun_timeout",
dagrun_timeout=datetime.timedelta(seconds=60),
processor_subdir=TEST_DAG_FOLDER,
session=session,
):
EmptyOperator(task_id="dummy")
dr = dag_maker.create_dagrun(start_date=timezone.utcnow() - datetime.timedelta(days=1))
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.dagbag = dag_maker.dagbag
# Mock that processor_agent is started
self.job_runner.processor_agent = mock.Mock()
callback = self.job_runner._schedule_dag_run(dr, session)
session.flush()
session.refresh(dr)
assert dr.state == State.FAILED
expected_callback = DagCallbackRequest(
full_filepath=dr.dag.fileloc,
dag_id=dr.dag_id,
is_failure_callback=True,
run_id=dr.run_id,
processor_subdir=TEST_DAG_FOLDER,
msg="timed_out",
)
# Verify dag failure callback request is sent
assert callback == expected_callback
session.rollback()
session.close()
def test_dagrun_timeout_fails_run_and_update_next_dagrun(self, dag_maker):
"""
Test that dagrun timeout fails run and update the next dagrun
"""
session = settings.Session()
with dag_maker(
max_active_runs=1,
dag_id="test_scheduler_fail_dagrun_timeout",
dagrun_timeout=datetime.timedelta(seconds=60),
):
EmptyOperator(task_id="dummy")
dr = dag_maker.create_dagrun(start_date=timezone.utcnow() - datetime.timedelta(days=1))
# check that next_dagrun is dr.execution_date
dag_maker.dag_model.next_dagrun == dr.execution_date
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.dagbag = dag_maker.dagbag
scheduler_job.executor = MockExecutor()
# Mock that processor_agent is started
self.job_runner.processor_agent = mock.Mock()
self.job_runner._schedule_dag_run(dr, session)
session.flush()
session.refresh(dr)
assert dr.state == State.FAILED
# check that next_dagrun_create_after has been updated by calculate_dagrun_date_fields
assert dag_maker.dag_model.next_dagrun_create_after == dr.execution_date + timedelta(days=1)
# check that no running/queued runs yet
assert (
session.query(DagRun).filter(DagRun.state.in_([DagRunState.RUNNING, DagRunState.QUEUED])).count()
== 0
)
@pytest.mark.parametrize(
"state, expected_callback_msg", [(State.SUCCESS, "success"), (State.FAILED, "task_failure")]
)
def test_dagrun_callbacks_are_called(self, state, expected_callback_msg, dag_maker):
"""
Test if DagRun is successful, and if Success callbacks is defined, it is sent to DagFileProcessor.
"""
with dag_maker(
dag_id="test_dagrun_callbacks_are_called",
on_success_callback=lambda x: print("success"),
on_failure_callback=lambda x: print("failed"),
processor_subdir=TEST_DAG_FOLDER,
) as dag:
EmptyOperator(task_id="dummy")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner.dagbag = dag_maker.dagbag
self.job_runner.processor_agent = mock.Mock()
session = settings.Session()
dr = dag_maker.create_dagrun()
ti = dr.get_task_instance("dummy")
ti.set_state(state, session)
with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
self.job_runner._do_scheduling(session)
expected_callback = DagCallbackRequest(
full_filepath=dag.fileloc,
dag_id=dr.dag_id,
is_failure_callback=bool(state == State.FAILED),
run_id=dr.run_id,
processor_subdir=TEST_DAG_FOLDER,
msg=expected_callback_msg,
)
# Verify dag failure callback request is sent to file processor
scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback)
session.rollback()
session.close()
@pytest.mark.parametrize(
"state, expected_callback_msg", [(State.SUCCESS, "success"), (State.FAILED, "task_failure")]
)
def test_dagrun_plugins_are_notified(self, state, expected_callback_msg, dag_maker):
"""
Test if DagRun is successful, and if Success callbacks is defined, it is sent to DagFileProcessor.
"""
with dag_maker(
dag_id="test_dagrun_callbacks_are_called",
on_success_callback=lambda x: print("success"),
on_failure_callback=lambda x: print("failed"),
processor_subdir=TEST_DAG_FOLDER,
):
EmptyOperator(task_id="dummy")
dag_listener.clear()
get_listener_manager().add_listener(dag_listener)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner.dagbag = dag_maker.dagbag
self.job_runner.processor_agent = mock.Mock()
session = settings.Session()
dr = dag_maker.create_dagrun()
ti = dr.get_task_instance("dummy")
ti.set_state(state, session)
with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
self.job_runner._do_scheduling(session)
assert len(dag_listener.success) or len(dag_listener.failure)
dag_listener.success = []
dag_listener.failure = []
session.rollback()
session.close()
def test_dagrun_timeout_callbacks_are_stored_in_database(self, dag_maker, session):
with dag_maker(
dag_id="test_dagrun_timeout_callbacks_are_stored_in_database",
on_failure_callback=lambda x: print("failed"),
dagrun_timeout=timedelta(hours=1),
processor_subdir=TEST_DAG_FOLDER,
) as dag:
EmptyOperator(task_id="empty")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
scheduler_job.executor.callback_sink = DatabaseCallbackSink()
self.job_runner.dagbag = dag_maker.dagbag
self.job_runner.processor_agent = mock.Mock()
dr = dag_maker.create_dagrun(start_date=DEFAULT_DATE)
with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
self.job_runner._do_scheduling(session)
callback = (
session.query(DbCallbackRequest)
.order_by(DbCallbackRequest.id.desc())
.first()
.get_callback_request()
)
expected_callback = DagCallbackRequest(
full_filepath=dag.fileloc,
dag_id=dr.dag_id,
is_failure_callback=True,
run_id=dr.run_id,
processor_subdir=TEST_DAG_FOLDER,
msg="timed_out",
)
assert callback == expected_callback
def test_dagrun_callbacks_commited_before_sent(self, dag_maker):
"""
Tests that before any callbacks are sent to the processor, the session is committed. This ensures
that the dagrun details are up to date when the callbacks are run.
"""
with dag_maker(dag_id="test_dagrun_callbacks_commited_before_sent"):
EmptyOperator(task_id="dummy")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.Mock()
self.job_runner._send_dag_callbacks_to_processor = mock.Mock()
self.job_runner._schedule_dag_run = mock.Mock()
dr = dag_maker.create_dagrun()
session = settings.Session()
ti = dr.get_task_instance("dummy")
ti.set_state(State.SUCCESS, session)
with mock.patch.object(settings, "USE_JOB_SCHEDULE", False), mock.patch(
"airflow.jobs.scheduler_job_runner.prohibit_commit"
) as mock_guard:
mock_guard.return_value.__enter__.return_value.commit.side_effect = session.commit
def mock_schedule_dag_run(*args, **kwargs):
mock_guard.reset_mock()
return None
def mock_send_dag_callbacks_to_processor(*args, **kwargs):
mock_guard.return_value.__enter__.return_value.commit.assert_called()
self.job_runner._send_dag_callbacks_to_processor.side_effect = (
mock_send_dag_callbacks_to_processor
)
self.job_runner._schedule_dag_run.side_effect = mock_schedule_dag_run
self.job_runner._do_scheduling(session)
# Verify dag failure callback request is sent to file processor
self.job_runner._send_dag_callbacks_to_processor.assert_called_once()
# and mock_send_dag_callbacks_to_processor has asserted the callback was sent after a commit
session.rollback()
session.close()
@pytest.mark.parametrize("state", [State.SUCCESS, State.FAILED])
def test_dagrun_callbacks_are_not_added_when_callbacks_are_not_defined(self, state, dag_maker):
"""
Test if no on_*_callback are defined on DAG, Callbacks not registered and sent to DAG Processor
"""
with dag_maker(
dag_id="test_dagrun_callbacks_are_not_added_when_callbacks_are_not_defined",
):
BashOperator(task_id="test_task", bash_command="echo hi")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.Mock()
self.job_runner._send_dag_callbacks_to_processor = mock.Mock()
session = settings.Session()
dr = dag_maker.create_dagrun()
ti = dr.get_task_instance("test_task")
ti.set_state(state, session)
with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
self.job_runner._do_scheduling(session)
# Verify Callback is not set (i.e is None) when no callbacks are set on DAG
self.job_runner._send_dag_callbacks_to_processor.assert_called_once()
call_args = self.job_runner._send_dag_callbacks_to_processor.call_args.args
assert call_args[0].dag_id == dr.dag_id
assert call_args[1] is None
session.rollback()
session.close()
@pytest.mark.parametrize("state, msg", [[State.SUCCESS, "success"], [State.FAILED, "task_failure"]])
def test_dagrun_callbacks_are_added_when_callbacks_are_defined(self, state, msg, dag_maker):
"""
Test if on_*_callback are defined on DAG, Callbacks ARE registered and sent to DAG Processor
"""
with dag_maker(
dag_id="test_dagrun_callbacks_are_added_when_callbacks_are_defined",
on_failure_callback=lambda: True,
on_success_callback=lambda: True,
):
BashOperator(task_id="test_task", bash_command="echo hi")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.Mock()
self.job_runner._send_dag_callbacks_to_processor = mock.Mock()
session = settings.Session()
dr = dag_maker.create_dagrun()
ti = dr.get_task_instance("test_task")
ti.set_state(state, session)
with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
self.job_runner._do_scheduling(session)
# Verify Callback is set (i.e is None) when no callbacks are set on DAG
self.job_runner._send_dag_callbacks_to_processor.assert_called_once()
call_args = self.job_runner._send_dag_callbacks_to_processor.call_args.args
assert call_args[0].dag_id == dr.dag_id
assert call_args[1] is not None
assert call_args[1].msg == msg
session.rollback()
session.close()
def test_dagrun_notify_called_success(self, dag_maker):
with dag_maker(
dag_id="test_dagrun_notify_called",
on_success_callback=lambda x: print("success"),
on_failure_callback=lambda x: print("failed"),
processor_subdir=TEST_DAG_FOLDER,
):
EmptyOperator(task_id="dummy")
dag_listener.clear()
get_listener_manager().add_listener(dag_listener)
executor = MockExecutor(do_update=False)
scheduler_job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(scheduler_job)
self.job_runner.dagbag = dag_maker.dagbag
self.job_runner.processor_agent = mock.MagicMock()
session = settings.Session()
dr = dag_maker.create_dagrun()
ti = dr.get_task_instance("dummy")
ti.set_state(State.SUCCESS, session)
with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
self.job_runner._do_scheduling(session)
assert dag_listener.success[0].dag_id == dr.dag_id
assert dag_listener.success[0].run_id == dr.run_id
assert dag_listener.success[0].state == DagRunState.SUCCESS
def test_do_not_schedule_removed_task(self, dag_maker):
schedule_interval = datetime.timedelta(days=1)
with dag_maker(
dag_id="test_scheduler_do_not_schedule_removed_task",
schedule=schedule_interval,
):
EmptyOperator(task_id="dummy")
session = settings.Session()
dr = dag_maker.create_dagrun()
assert dr is not None
# Re-create the DAG, but remove the task
# Delete DagModel first to avoid duplicate record
session.query(DagModel).delete()
with dag_maker(
dag_id="test_scheduler_do_not_schedule_removed_task",
schedule=schedule_interval,
start_date=DEFAULT_DATE + schedule_interval,
):
pass
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
res = self.job_runner._executable_task_instances_to_queued(max_tis=32, session=session)
assert [] == res
@provide_session
def evaluate_dagrun(
self,
dag_id,
expected_task_states, # dict of task_id: state
dagrun_state,
run_kwargs=None,
advance_execution_date=False,
session=None,
):
"""
Helper for testing DagRun states with simple two-task DAGs.
This is hackish: a dag run is created but its tasks are
run by a backfill.
"""
if run_kwargs is None:
run_kwargs = {}
dag = self.dagbag.get_dag(dag_id)
dagrun_info = dag.next_dagrun_info(None)
assert dagrun_info is not None
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=dagrun_info.logical_date,
state=None,
session=session,
data_interval=data_interval,
)
if advance_execution_date:
# run a second time to schedule a dagrun after the start_date
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=dr.data_interval_end,
state=None,
session=session,
data_interval=data_interval,
)
ex_date = dr.execution_date
for tid, state in expected_task_states.items():
if state == State.FAILED:
self.null_exec.mock_task_fail(dag_id, tid, dr.run_id)
try:
dag = DagBag().get_dag(dag.dag_id)
# This needs a _REAL_ dag, not the serialized version
assert not isinstance(dag, SerializedDAG)
# TODO: Can this be replaced with `self.run_scheduler_until_dagrun_terminal. `dag.run` isn't
# great to use here as it uses BackfillJobRunner!
dag.run(start_date=ex_date, end_date=ex_date, executor=self.null_exec, **run_kwargs)
except AirflowException:
pass
# load dagrun
dr = DagRun.find(dag_id=dag_id, execution_date=ex_date, session=session)
dr = dr[0]
dr.dag = dag
assert dr.state == dagrun_state
# test tasks
for task_id, expected_state in expected_task_states.items():
ti = dr.get_task_instance(task_id)
assert ti.state == expected_state
def test_dagrun_fail(self):
"""
DagRuns with one failed and one incomplete root task -> FAILED
"""
self.evaluate_dagrun(
dag_id="test_dagrun_states_fail",
expected_task_states={
"test_dagrun_fail": State.FAILED,
"test_dagrun_succeed": State.UPSTREAM_FAILED,
},
dagrun_state=State.FAILED,
)
def test_dagrun_success(self):
"""
DagRuns with one failed and one successful root task -> SUCCESS
"""
self.evaluate_dagrun(
dag_id="test_dagrun_states_success",
expected_task_states={
"test_dagrun_fail": State.FAILED,
"test_dagrun_succeed": State.SUCCESS,
},
dagrun_state=State.SUCCESS,
)
def test_dagrun_root_fail(self):
"""
DagRuns with one successful and one failed root task -> FAILED
"""
self.evaluate_dagrun(
dag_id="test_dagrun_states_root_fail",
expected_task_states={
"test_dagrun_succeed": State.SUCCESS,
"test_dagrun_fail": State.FAILED,
},
dagrun_state=State.FAILED,
)
def test_dagrun_root_fail_unfinished(self):
"""
DagRuns with one unfinished and one failed root task -> RUNNING
"""
# TODO: this should live in test_dagrun.py
# Run both the failed and successful tasks
dag_id = "test_dagrun_states_root_fail_unfinished"
dag = self.dagbag.get_dag(dag_id)
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dr = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=None,
data_interval=data_interval,
)
self.null_exec.mock_task_fail(dag_id, "test_dagrun_fail", dr.run_id)
with pytest.raises(AirflowException):
dag.run(start_date=dr.execution_date, end_date=dr.execution_date, executor=self.null_exec)
# Mark the successful task as never having run since we want to see if the
# dagrun will be in a running state despite having an unfinished task.
with create_session() as session:
ti = dr.get_task_instance("test_dagrun_unfinished", session=session)
ti.state = State.NONE
session.commit()
dr.update_state()
assert dr.state == State.RUNNING
def test_dagrun_root_after_dagrun_unfinished(self):
"""
DagRuns with one successful and one future root task -> SUCCESS
Noted: the DagRun state could be still in running state during CI.
"""
clear_db_dags()
dag_id = "test_dagrun_states_root_future"
dag = self.dagbag.get_dag(dag_id)
dag.sync_to_db()
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=1, subdir=dag.fileloc)
run_job(scheduler_job, execute_callable=self.job_runner._execute)
first_run = DagRun.find(dag_id=dag_id, execution_date=DEFAULT_DATE)[0]
ti_ids = [(ti.task_id, ti.state) for ti in first_run.get_task_instances()]
assert ti_ids == [("current", State.SUCCESS)]
assert first_run.state in [State.SUCCESS, State.RUNNING]
def test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date(self):
"""
DagRun is marked a success if ignore_first_depends_on_past=True
Test that an otherwise-deadlocked dagrun is marked as a success
if ignore_first_depends_on_past=True and the dagrun execution_date
is after the start_date.
"""
self.evaluate_dagrun(
dag_id="test_dagrun_states_deadlock",
expected_task_states={
"test_depends_on_past": State.SUCCESS,
"test_depends_on_past_2": State.SUCCESS,
},
dagrun_state=State.SUCCESS,
advance_execution_date=True,
run_kwargs=dict(ignore_first_depends_on_past=True),
)
def test_dagrun_deadlock_ignore_depends_on_past(self):
"""
Test that ignore_first_depends_on_past doesn't affect results
(this is the same test as
test_dagrun_deadlock_ignore_depends_on_past_advance_ex_date except
that start_date == execution_date so depends_on_past is irrelevant).
"""
self.evaluate_dagrun(
dag_id="test_dagrun_states_deadlock",
expected_task_states={
"test_depends_on_past": State.SUCCESS,
"test_depends_on_past_2": State.SUCCESS,
},
dagrun_state=State.SUCCESS,
run_kwargs=dict(ignore_first_depends_on_past=True),
)
@pytest.mark.parametrize(
"configs",
[
{("scheduler", "standalone_dag_processor"): "False"},
{("scheduler", "standalone_dag_processor"): "True"},
],
)
def test_scheduler_start_date(self, configs):
"""
Test that the scheduler respects start_dates, even when DAGs have run
"""
with conf_vars(configs):
with create_session() as session:
dag_id = "test_start_date_scheduling"
dag = self.dagbag.get_dag(dag_id)
dag.clear()
assert dag.start_date > datetime.datetime.now(timezone.utc)
# Deactivate other dags in this file
other_dag = self.dagbag.get_dag("test_task_start_date_scheduling")
other_dag.is_paused_upon_creation = True
other_dag.sync_to_db()
scheduler_job = Job(
executor=self.null_exec,
)
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=dag.fileloc, num_runs=1)
run_job(scheduler_job, execute_callable=self.job_runner._execute)
# zero tasks ran
assert len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()) == 0
session.commit()
assert [] == self.null_exec.sorted_tasks
# previously, running this backfill would kick off the Scheduler
# because it would take the most recent run and start from there
# That behavior still exists, but now it will only do so if after the
# start date
bf_exec = MockExecutor()
backfill_job = Job(executor=bf_exec)
job_runner = BackfillJobRunner(
job=backfill_job, dag=dag, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE
)
run_job(job=backfill_job, execute_callable=job_runner._execute)
# one task ran
assert len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()) == 1
assert [
(
TaskInstanceKey(dag.dag_id, "dummy", f"backfill__{DEFAULT_DATE.isoformat()}", 1),
(State.SUCCESS, None),
),
] == bf_exec.sorted_tasks
session.commit()
scheduler_job = Job(
executor=self.null_exec,
)
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=dag.fileloc, num_runs=1)
run_job(scheduler_job, execute_callable=self.job_runner._execute)
# still one task
assert len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()) == 1
session.commit()
assert [] == self.null_exec.sorted_tasks
@pytest.mark.parametrize(
"configs",
[
{("scheduler", "standalone_dag_processor"): "False"},
{("scheduler", "standalone_dag_processor"): "True"},
],
)
def test_scheduler_task_start_date(self, configs):
"""
Test that the scheduler respects task start dates that are different from DAG start dates
"""
with conf_vars(configs):
dagbag = DagBag(
dag_folder=os.path.join(settings.DAGS_FOLDER, "test_scheduler_dags.py"),
include_examples=False,
)
dag_id = "test_task_start_date_scheduling"
dag = self.dagbag.get_dag(dag_id)
dag.is_paused_upon_creation = False
dagbag.bag_dag(dag=dag, root_dag=dag)
# Deactivate other dags in this file so the scheduler doesn't waste time processing them
other_dag = self.dagbag.get_dag("test_start_date_scheduling")
other_dag.is_paused_upon_creation = True
dagbag.bag_dag(dag=other_dag, root_dag=other_dag)
dagbag.sync_to_db()
scheduler_job = Job(
executor=self.null_exec,
)
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=dag.fileloc, num_runs=3)
run_job(scheduler_job, execute_callable=self.job_runner._execute)
session = settings.Session()
tiq = session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id)
ti1s = tiq.filter(TaskInstance.task_id == "dummy1").all()
ti2s = tiq.filter(TaskInstance.task_id == "dummy2").all()
assert len(ti1s) == 0
assert len(ti2s) >= 2
for task in ti2s:
assert task.state == State.SUCCESS
@pytest.mark.parametrize(
"configs",
[
{("scheduler", "standalone_dag_processor"): "False"},
{("scheduler", "standalone_dag_processor"): "True"},
],
)
def test_scheduler_multiprocessing(self, configs):
"""
Test that the scheduler can successfully queue multiple dags in parallel
"""
with conf_vars(configs):
dag_ids = ["test_start_date_scheduling", "test_dagrun_states_success"]
for dag_id in dag_ids:
dag = self.dagbag.get_dag(dag_id)
dag.clear()
scheduler_job = Job(
executor=self.null_exec,
)
self.job_runner = SchedulerJobRunner(
job=scheduler_job,
subdir=os.path.join(TEST_DAG_FOLDER, "test_scheduler_dags.py"),
num_runs=1,
)
run_job(scheduler_job, execute_callable=self.job_runner._execute)
# zero tasks ran
dag_id = "test_start_date_scheduling"
session = settings.Session()
assert len(session.query(TaskInstance).filter(TaskInstance.dag_id == dag_id).all()) == 0
@pytest.mark.parametrize(
"configs",
[
{("scheduler", "standalone_dag_processor"): "False"},
{("scheduler", "standalone_dag_processor"): "True"},
],
)
def test_scheduler_verify_pool_full(self, dag_maker, configs):
"""
Test task instances not queued when pool is full
"""
with conf_vars(configs):
with dag_maker(dag_id="test_scheduler_verify_pool_full"):
BashOperator(
task_id="dummy",
pool="test_scheduler_verify_pool_full",
bash_command="echo hi",
)
session = settings.Session()
pool = Pool(pool="test_scheduler_verify_pool_full", slots=1, include_deferred=False)
session.add(pool)
session.flush()
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
# Create 2 dagruns, which will create 2 task instances.
dr = dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
)
self.job_runner._schedule_dag_run(dr, session)
dr = dag_maker.create_dagrun_after(dr, run_type=DagRunType.SCHEDULED, state=State.RUNNING)
self.job_runner._schedule_dag_run(dr, session)
session.flush()
task_instances_list = self.job_runner._executable_task_instances_to_queued(
max_tis=32, session=session
)
assert len(task_instances_list) == 1
@pytest.mark.need_serialized_dag
def test_scheduler_verify_pool_full_2_slots_per_task(self, dag_maker, session):
"""
Test task instances not queued when pool is full.
Variation with non-default pool_slots
"""
with dag_maker(
dag_id="test_scheduler_verify_pool_full_2_slots_per_task",
start_date=DEFAULT_DATE,
session=session,
):
BashOperator(
task_id="dummy",
pool="test_scheduler_verify_pool_full_2_slots_per_task",
pool_slots=2,
bash_command="echo hi",
)
pool = Pool(pool="test_scheduler_verify_pool_full_2_slots_per_task", slots=6, include_deferred=False)
session.add(pool)
session.flush()
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
# Create 5 dagruns, which will create 5 task instances.
def _create_dagruns():
dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
yield dr
for _ in range(4):
dr = dag_maker.create_dagrun_after(dr, run_type=DagRunType.SCHEDULED)
yield dr
for dr in _create_dagruns():
self.job_runner._schedule_dag_run(dr, session)
task_instances_list = self.job_runner._executable_task_instances_to_queued(
max_tis=32, session=session
)
# As tasks require 2 slots, only 3 can fit into 6 available
assert len(task_instances_list) == 3
def test_scheduler_keeps_scheduling_pool_full(self, dag_maker):
"""
Test task instances in a pool that isn't full keep getting scheduled even when a pool is full.
"""
with dag_maker(
dag_id="test_scheduler_keeps_scheduling_pool_full_d1",
start_date=DEFAULT_DATE,
):
BashOperator(
task_id="test_scheduler_keeps_scheduling_pool_full_t1",
pool="test_scheduler_keeps_scheduling_pool_full_p1",
bash_command="echo hi",
)
dag_d1 = dag_maker.dag
with dag_maker(
dag_id="test_scheduler_keeps_scheduling_pool_full_d2",
start_date=DEFAULT_DATE,
):
BashOperator(
task_id="test_scheduler_keeps_scheduling_pool_full_t2",
pool="test_scheduler_keeps_scheduling_pool_full_p2",
bash_command="echo hi",
)
dag_d2 = dag_maker.dag
session = settings.Session()
pool_p1 = Pool(pool="test_scheduler_keeps_scheduling_pool_full_p1", slots=1, include_deferred=False)
pool_p2 = Pool(pool="test_scheduler_keeps_scheduling_pool_full_p2", slots=10, include_deferred=False)
session.add(pool_p1)
session.add(pool_p2)
session.flush()
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
def _create_dagruns(dag: DAG):
next_info = dag.next_dagrun_info(None)
assert next_info is not None
for _ in range(30):
yield dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=next_info.logical_date,
data_interval=next_info.data_interval,
state=DagRunState.RUNNING,
)
next_info = dag.next_dagrun_info(next_info.data_interval)
if next_info is None:
break
# Create 30 dagruns for each DAG.
# To increase the chances the TIs from the "full" pool will get retrieved first, we schedule all
# TIs from the first dag first.
for dr in _create_dagruns(dag_d1):
self.job_runner._schedule_dag_run(dr, session)
for dr in _create_dagruns(dag_d2):
self.job_runner._schedule_dag_run(dr, session)
self.job_runner._executable_task_instances_to_queued(max_tis=2, session=session)
task_instances_list2 = self.job_runner._executable_task_instances_to_queued(
max_tis=2, session=session
)
# Make sure we get TIs from a non-full pool in the 2nd list
assert len(task_instances_list2) > 0
assert all(
task_instance.pool != "test_scheduler_keeps_scheduling_pool_full_p1"
for task_instance in task_instances_list2
)
def test_scheduler_verify_priority_and_slots(self, dag_maker):
"""
Test task instances with higher priority are not queued
when pool does not have enough slots.
Though tasks with lower priority might be executed.
"""
with dag_maker(dag_id="test_scheduler_verify_priority_and_slots"):
# Medium priority, not enough slots
BashOperator(
task_id="test_scheduler_verify_priority_and_slots_t0",
pool="test_scheduler_verify_priority_and_slots",
pool_slots=2,
priority_weight=2,
bash_command="echo hi",
)
# High priority, occupies first slot
BashOperator(
task_id="test_scheduler_verify_priority_and_slots_t1",
pool="test_scheduler_verify_priority_and_slots",
pool_slots=1,
priority_weight=3,
bash_command="echo hi",
)
# Low priority, occupies second slot
BashOperator(
task_id="test_scheduler_verify_priority_and_slots_t2",
pool="test_scheduler_verify_priority_and_slots",
pool_slots=1,
priority_weight=1,
bash_command="echo hi",
)
session = settings.Session()
pool = Pool(pool="test_scheduler_verify_priority_and_slots", slots=2, include_deferred=False)
session.add(pool)
session.flush()
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
dr = dag_maker.create_dagrun()
for ti in dr.task_instances:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()
task_instances_list = self.job_runner._executable_task_instances_to_queued(
max_tis=32, session=session
)
# Only second and third
assert len(task_instances_list) == 2
ti0 = (
session.query(TaskInstance)
.filter(TaskInstance.task_id == "test_scheduler_verify_priority_and_slots_t0")
.first()
)
assert ti0.state == State.SCHEDULED
ti1 = (
session.query(TaskInstance)
.filter(TaskInstance.task_id == "test_scheduler_verify_priority_and_slots_t1")
.first()
)
assert ti1.state == State.QUEUED
ti2 = (
session.query(TaskInstance)
.filter(TaskInstance.task_id == "test_scheduler_verify_priority_and_slots_t2")
.first()
)
assert ti2.state == State.QUEUED
def test_verify_integrity_if_dag_not_changed(self, dag_maker):
# CleanUp
with create_session() as session:
session.query(SerializedDagModel).filter(
SerializedDagModel.dag_id == "test_verify_integrity_if_dag_not_changed"
).delete(synchronize_session=False)
with dag_maker(dag_id="test_verify_integrity_if_dag_not_changed") as dag:
BashOperator(task_id="dummy", bash_command="echo hi")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
orm_dag = dag_maker.dag_model
assert orm_dag is not None
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
dag = self.job_runner.dagbag.get_dag("test_verify_integrity_if_dag_not_changed", session=session)
self.job_runner._create_dag_runs([orm_dag], session)
drs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(drs) == 1
dr = drs[0]
# Verify that DagRun.verify_integrity is not called
with mock.patch("airflow.jobs.scheduler_job_runner.DagRun.verify_integrity") as mock_verify_integrity:
self.job_runner._schedule_dag_run(dr, session)
mock_verify_integrity.assert_not_called()
session.flush()
tis_count = (
session.query(func.count(TaskInstance.task_id))
.filter(
TaskInstance.dag_id == dr.dag_id,
TaskInstance.execution_date == dr.execution_date,
TaskInstance.task_id == dr.dag.tasks[0].task_id,
TaskInstance.state == State.SCHEDULED,
)
.scalar()
)
assert tis_count == 1
latest_dag_version = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
assert dr.dag_hash == latest_dag_version
session.rollback()
session.close()
def test_verify_integrity_if_dag_changed(self, dag_maker):
# CleanUp
with create_session() as session:
session.query(SerializedDagModel).filter(
SerializedDagModel.dag_id == "test_verify_integrity_if_dag_changed"
).delete(synchronize_session=False)
with dag_maker(dag_id="test_verify_integrity_if_dag_changed") as dag:
BashOperator(task_id="dummy", bash_command="echo hi")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
orm_dag = dag_maker.dag_model
assert orm_dag is not None
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
dag = self.job_runner.dagbag.get_dag("test_verify_integrity_if_dag_changed", session=session)
self.job_runner._create_dag_runs([orm_dag], session)
drs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(drs) == 1
dr = drs[0]
dag_version_1 = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
assert dr.dag_hash == dag_version_1
assert self.job_runner.dagbag.dags == {"test_verify_integrity_if_dag_changed": dag}
assert len(self.job_runner.dagbag.dags.get("test_verify_integrity_if_dag_changed").tasks) == 1
# Now let's say the DAG got updated (new task got added)
BashOperator(task_id="bash_task_1", dag=dag, bash_command="echo hi")
SerializedDagModel.write_dag(dag=dag)
dag_version_2 = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
assert dag_version_2 != dag_version_1
self.job_runner._schedule_dag_run(dr, session)
session.flush()
drs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(drs) == 1
dr = drs[0]
assert dr.dag_hash == dag_version_2
assert self.job_runner.dagbag.dags == {"test_verify_integrity_if_dag_changed": dag}
assert len(self.job_runner.dagbag.dags.get("test_verify_integrity_if_dag_changed").tasks) == 2
tis_count = (
session.query(func.count(TaskInstance.task_id))
.filter(
TaskInstance.dag_id == dr.dag_id,
TaskInstance.execution_date == dr.execution_date,
TaskInstance.state == State.SCHEDULED,
)
.scalar()
)
assert tis_count == 2
latest_dag_version = SerializedDagModel.get_latest_version_hash(dr.dag_id, session=session)
assert dr.dag_hash == latest_dag_version
session.rollback()
session.close()
def test_verify_integrity_if_dag_disappeared(self, dag_maker, caplog):
# CleanUp
with create_session() as session:
session.query(SerializedDagModel).filter(
SerializedDagModel.dag_id == "test_verify_integrity_if_dag_disappeared"
).delete(synchronize_session=False)
with dag_maker(dag_id="test_verify_integrity_if_dag_disappeared") as dag:
BashOperator(task_id="dummy", bash_command="echo hi")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
orm_dag = dag_maker.dag_model
assert orm_dag is not None
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
dag = self.job_runner.dagbag.get_dag("test_verify_integrity_if_dag_disappeared", session=session)
self.job_runner._create_dag_runs([orm_dag], session)
dag_id = dag.dag_id
drs = DagRun.find(dag_id=dag_id, session=session)
assert len(drs) == 1
dr = drs[0]
dag_version_1 = SerializedDagModel.get_latest_version_hash(dag_id, session=session)
assert dr.dag_hash == dag_version_1
assert self.job_runner.dagbag.dags == {"test_verify_integrity_if_dag_disappeared": dag}
assert len(self.job_runner.dagbag.dags.get("test_verify_integrity_if_dag_disappeared").tasks) == 1
SerializedDagModel.remove_dag(dag_id=dag_id)
dag = self.job_runner.dagbag.dags[dag_id]
self.job_runner.dagbag.dags = MagicMock()
self.job_runner.dagbag.dags.get.side_effect = [dag, None]
session.flush()
with caplog.at_level(logging.WARNING):
callback = self.job_runner._schedule_dag_run(dr, session)
assert "The DAG disappeared before verifying integrity" in caplog.text
assert callback is None
session.rollback()
session.close()
@pytest.mark.need_serialized_dag
def test_retry_still_in_executor(self, dag_maker):
"""
Checks if the scheduler does not put a task in limbo, when a task is retried
but is still present in the executor.
"""
executor = MockExecutor(do_update=False)
with create_session() as session:
with dag_maker(
dag_id="test_retry_still_in_executor",
schedule="@once",
session=session,
):
dag_task1 = BashOperator(
task_id="test_retry_handling_op",
bash_command="exit 1",
retries=1,
)
dag_maker.dag_model.calculate_dagrun_date_fields(dag_maker.dag, None)
@provide_session
def do_schedule(session):
# Use a empty file since the above mock will return the
# expected DAGs. Also specify only a single file so that it doesn't
# try to schedule the above DAG repeatedly.
scheduler_job = Job(
executor=executor,
)
self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=1, subdir=os.devnull)
self.job_runner.dagbag = dag_maker.dagbag
scheduler_job.heartrate = 0
# Since the DAG is not in the directory watched by scheduler job,
# it would've been marked as deleted and not being scheduled.
with mock.patch.object(DagModel, "deactivate_deleted_dags"):
run_job(scheduler_job, execute_callable=self.job_runner._execute)
do_schedule()
with create_session() as session:
ti = (
session.query(TaskInstance)
.filter(
TaskInstance.dag_id == "test_retry_still_in_executor",
TaskInstance.task_id == "test_retry_handling_op",
)
.first()
)
assert ti is not None, "Task not created by scheduler"
ti.task = dag_task1
def run_with_error(ti, ignore_ti_state=False):
with contextlib.suppress(AirflowException):
ti.run(ignore_ti_state=ignore_ti_state)
assert ti.try_number == 1
# At this point, scheduler has tried to schedule the task once and
# heartbeated the executor once, which moved the state of the task from
# SCHEDULED to QUEUED and then to SCHEDULED, to fail the task execution
# we need to ignore the TaskInstance state as SCHEDULED is not a valid state to start
# executing task.
run_with_error(ti, ignore_ti_state=True)
assert ti.state == State.UP_FOR_RETRY
assert ti.try_number == 1
with create_session() as session:
ti.refresh_from_db(lock_for_update=True, session=session)
ti.state = State.SCHEDULED
session.merge(ti)
# To verify that task does get re-queued.
executor.do_update = True
do_schedule()
ti.refresh_from_db()
assert ti.try_number == 1
assert ti.state == State.SUCCESS
def test_retry_handling_job(self):
"""
Integration test of the scheduler not accidentally resetting
the try_numbers for a task
"""
dag = self.dagbag.get_dag("test_retry_handling_job")
dag_task1 = dag.get_task("test_retry_handling_op")
dag.clear()
dag.sync_to_db()
scheduler_job = Job(job_type=SchedulerJobRunner.job_type, heartrate=0)
self.job_runner = SchedulerJobRunner(job=scheduler_job, num_runs=1)
self.job_runner.processor_agent = mock.MagicMock()
run_job(scheduler_job, execute_callable=self.job_runner._execute)
session = settings.Session()
ti = (
session.query(TaskInstance)
.filter(TaskInstance.dag_id == dag.dag_id, TaskInstance.task_id == dag_task1.task_id)
.first()
)
assert ti.state == State.UP_FOR_RETRY
def test_dag_get_active_runs(self, dag_maker):
"""
Test to check that a DAG returns its active runs
"""
now = timezone.utcnow()
six_hours_ago_to_the_hour = (now - datetime.timedelta(hours=6)).replace(
minute=0, second=0, microsecond=0
)
start_date = six_hours_ago_to_the_hour
dag_name1 = "get_active_runs_test"
default_args = {"depends_on_past": False, "start_date": start_date}
with dag_maker(dag_name1, schedule="* * * * *", max_active_runs=1, default_args=default_args) as dag1:
run_this_1 = EmptyOperator(task_id="run_this_1")
run_this_2 = EmptyOperator(task_id="run_this_2")
run_this_2.set_upstream(run_this_1)
run_this_3 = EmptyOperator(task_id="run_this_3")
run_this_3.set_upstream(run_this_2)
dr = dag_maker.create_dagrun()
# We had better get a dag run
assert dr is not None
execution_date = dr.execution_date
running_dates = dag1.get_active_runs()
try:
running_date = running_dates[0]
except Exception:
running_date = "Except"
assert execution_date == running_date, "Running Date must match Execution Date"
def test_list_py_file_paths(self):
"""
[JIRA-1357] Test the 'list_py_file_paths' function used by the
scheduler to list and load DAGs.
"""
detected_files = set()
expected_files = set()
# No_dags is empty, _invalid_ is ignored by .airflowignore
ignored_files = {
"no_dags.py",
"test_invalid_cron.py",
"test_invalid_dup_task.py",
"test_ignore_this.py",
"test_invalid_param.py",
"test_invalid_param2.py",
"test_invalid_param3.py",
"test_invalid_param4.py",
"test_nested_dag.py",
"test_imports.py",
"__init__.py",
}
for root, _, files in os.walk(TEST_DAG_FOLDER):
for file_name in files:
if file_name.endswith((".py", ".zip")):
if file_name not in ignored_files:
expected_files.add(f"{root}/{file_name}")
for file_path in list_py_file_paths(TEST_DAG_FOLDER, include_examples=False):
detected_files.add(file_path)
assert detected_files == expected_files
ignored_files = {
"helper.py",
}
example_dag_folder = airflow.example_dags.__path__[0]
for root, _, files in os.walk(example_dag_folder):
for file_name in files:
if file_name.endswith((".py", ".zip")):
if file_name not in ["__init__.py"] and file_name not in ignored_files:
expected_files.add(os.path.join(root, file_name))
detected_files.clear()
for file_path in list_py_file_paths(TEST_DAG_FOLDER, include_examples=True):
detected_files.add(file_path)
assert detected_files == expected_files
def test_adopt_or_reset_orphaned_tasks_nothing(self):
"""Try with nothing."""
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job)
session = settings.Session()
assert 0 == self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
@pytest.mark.parametrize(
"adoptable_state",
list(sorted(State.adoptable_states)),
)
def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, session):
dag_id = "test_adopt_or_reset_adoptable_tasks_" + adoptable_state.name
with dag_maker(dag_id=dag_id, schedule="@daily"):
task_id = dag_id + "_task"
EmptyOperator(task_id=task_id)
old_job = Job()
session.add(old_job)
session.commit()
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
dr1 = dag_maker.create_dagrun(external_trigger=True)
ti = dr1.get_task_instances(session=session)[0]
ti.state = adoptable_state
ti.queued_by_job_id = old_job.id
session.merge(ti)
session.merge(dr1)
session.commit()
num_reset_tis = self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
assert 1 == num_reset_tis
def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self, dag_maker, session):
dag_id = "test_reset_orphaned_tasks_external_triggered_dag"
with dag_maker(dag_id=dag_id, schedule="@daily"):
task_id = dag_id + "_task"
EmptyOperator(task_id=task_id)
old_job = Job()
session.add(old_job)
session.flush()
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
dr1 = dag_maker.create_dagrun(external_trigger=True)
ti = dr1.get_task_instances(session=session)[0]
ti.state = State.QUEUED
ti.queued_by_job_id = old_job.id
session.merge(ti)
session.merge(dr1)
session.commit()
num_reset_tis = self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
assert num_reset_tis == 1
def test_adopt_or_reset_orphaned_tasks_backfill_dag(self, dag_maker):
dag_id = "test_adopt_or_reset_orphaned_tasks_backfill_dag"
with dag_maker(dag_id=dag_id, schedule="@daily"):
task_id = dag_id + "_task"
EmptyOperator(task_id=task_id)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
session.add(scheduler_job)
session.flush()
dr1 = dag_maker.create_dagrun(run_type=DagRunType.BACKFILL_JOB)
ti = dr1.get_task_instances(session=session)[0]
ti.state = State.SCHEDULED
session.merge(ti)
session.merge(dr1)
session.flush()
assert dr1.is_backfill
assert 0 == self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
session.rollback()
def test_reset_orphaned_tasks_no_orphans(self, dag_maker):
dag_id = "test_reset_orphaned_tasks_no_orphans"
with dag_maker(dag_id=dag_id, schedule="@daily"):
task_id = dag_id + "_task"
EmptyOperator(task_id=task_id)
scheduler_job = Job()
scheduler_job.state = "running"
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
session.add(scheduler_job)
session.flush()
dr1 = dag_maker.create_dagrun()
tis = dr1.get_task_instances(session=session)
tis[0].state = State.RUNNING
tis[0].queued_by_job_id = scheduler_job.id
session.merge(dr1)
session.merge(tis[0])
session.flush()
assert self.job_runner.adopt_or_reset_orphaned_tasks(session=session) == 0
tis[0].refresh_from_db()
assert tis[0].state == State.RUNNING
def test_reset_orphaned_tasks_non_running_dagruns(self, dag_maker):
"""Ensure orphaned tasks with non-running dagruns are not reset."""
dag_id = "test_reset_orphaned_tasks_non_running_dagruns"
with dag_maker(dag_id=dag_id, schedule="@daily"):
task_id = dag_id + "_task"
EmptyOperator(task_id=task_id)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
session.add(scheduler_job)
session.flush()
dr1 = dag_maker.create_dagrun()
dr1.state = State.QUEUED
tis = dr1.get_task_instances(session=session)
assert 1 == len(tis)
tis[0].state = State.SCHEDULED
session.merge(dr1)
session.merge(tis[0])
session.flush()
assert 0 == self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
session.rollback()
def test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs(self, dag_maker):
dag_id = "test_adopt_or_reset_orphaned_tasks_stale_scheduler_jobs"
with dag_maker(dag_id=dag_id, schedule="@daily"):
EmptyOperator(task_id="task1")
EmptyOperator(task_id="task2")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
session = settings.Session()
scheduler_job.state = State.RUNNING
scheduler_job.latest_heartbeat = timezone.utcnow()
session.add(scheduler_job)
old_job = Job()
old_job_runner = SchedulerJobRunner(job=old_job, subdir=os.devnull)
old_job.state = State.RUNNING
old_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15)
session.add(old_job)
session.flush()
dr1 = dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
start_date=timezone.utcnow(),
)
ti1, ti2 = dr1.get_task_instances(session=session)
dr1.state = State.RUNNING
ti1.state = State.QUEUED
ti1.queued_by_job_id = old_job.id
session.merge(dr1)
session.merge(ti1)
ti2.state = State.QUEUED
ti2.queued_by_job_id = scheduler_job.id
session.merge(ti2)
session.flush()
num_reset_tis = self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
assert 1 == num_reset_tis
session.refresh(ti1)
assert ti1.state is None
session.refresh(ti2)
assert ti2.state == State.QUEUED
session.rollback()
if old_job_runner.processor_agent:
old_job_runner.processor_agent.end()
def test_adopt_or_reset_orphaned_tasks_only_fails_scheduler_jobs(self, caplog):
"""Make sure we only set SchedulerJobs to failed, not all jobs"""
session = settings.Session()
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.state = State.RUNNING
scheduler_job.latest_heartbeat = timezone.utcnow()
session.add(scheduler_job)
session.flush()
old_job = Job()
self.job_runner = SchedulerJobRunner(job=old_job, subdir=os.devnull)
old_job.state = State.RUNNING
old_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15)
session.add(old_job)
session.flush()
old_task_job = Job(state=State.RUNNING)
old_task_job.latest_heartbeat = timezone.utcnow() - timedelta(minutes=15)
session.add(old_task_job)
session.flush()
with caplog.at_level("INFO", logger="airflow.jobs.scheduler_job_runner"):
self.job_runner.adopt_or_reset_orphaned_tasks(session=session)
session.expire_all()
assert old_job.state == State.FAILED
assert old_task_job.state == State.RUNNING
assert "Marked 1 SchedulerJob instances as failed" in caplog.messages
def test_send_sla_callbacks_to_processor_sla_disabled(self, dag_maker):
"""Test SLA Callbacks are not sent when check_slas is False"""
dag_id = "test_send_sla_callbacks_to_processor_sla_disabled"
with dag_maker(dag_id=dag_id, schedule="@daily") as dag:
EmptyOperator(task_id="task1")
with patch.object(settings, "CHECK_SLAS", False):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner._send_sla_callbacks_to_processor(dag)
scheduler_job.executor.callback_sink.send.assert_not_called()
def test_send_sla_callbacks_to_processor_sla_no_task_slas(self, dag_maker):
"""Test SLA Callbacks are not sent when no task SLAs are defined"""
dag_id = "test_send_sla_callbacks_to_processor_sla_no_task_slas"
with dag_maker(dag_id=dag_id, schedule="@daily") as dag:
EmptyOperator(task_id="task1")
with patch.object(settings, "CHECK_SLAS", True):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner._send_sla_callbacks_to_processor(dag)
scheduler_job.executor.callback_sink.send.assert_not_called()
@pytest.mark.parametrize(
"schedule",
[
"@daily",
"0 10 * * *",
timedelta(hours=2),
],
)
def test_send_sla_callbacks_to_processor_sla_with_task_slas(self, schedule, dag_maker):
"""Test SLA Callbacks are sent to the DAG Processor when SLAs are defined on tasks"""
dag_id = "test_send_sla_callbacks_to_processor_sla_with_task_slas"
with dag_maker(
dag_id=dag_id,
schedule=schedule,
processor_subdir=TEST_DAG_FOLDER,
) as dag:
EmptyOperator(task_id="task1", sla=timedelta(seconds=60))
with patch.object(settings, "CHECK_SLAS", True):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner._send_sla_callbacks_to_processor(dag)
expected_callback = SlaCallbackRequest(
full_filepath=dag.fileloc,
dag_id=dag.dag_id,
processor_subdir=TEST_DAG_FOLDER,
)
scheduler_job.executor.callback_sink.send.assert_called_once_with(expected_callback)
@pytest.mark.parametrize(
"schedule",
[
None,
[Dataset("foo")],
],
)
def test_send_sla_callbacks_to_processor_sla_dag_not_scheduled(self, schedule, dag_maker):
"""Test SLA Callbacks are not sent when DAG isn't scheduled"""
dag_id = "test_send_sla_callbacks_to_processor_sla_no_task_slas"
with dag_maker(dag_id=dag_id, schedule=schedule) as dag:
EmptyOperator(task_id="task1", sla=timedelta(seconds=5))
with patch.object(settings, "CHECK_SLAS", True):
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner._send_sla_callbacks_to_processor(dag)
scheduler_job.executor.callback_sink.send.assert_not_called()
@pytest.mark.parametrize(
"schedule, number_running, excepted",
[
(None, None, False),
("*/1 * * * *", None, False),
("*/1 * * * *", 1, True),
],
ids=["no_dag_schedule", "dag_schedule_too_many_runs", "dag_schedule_less_runs"],
)
def test_should_update_dag_next_dagruns(self, schedule, number_running, excepted, session, dag_maker):
"""Test if really required to update next dagrun or possible to save run time"""
with dag_maker(
dag_id="test_should_update_dag_next_dagruns", schedule=schedule, max_active_runs=2
) as dag:
EmptyOperator(task_id="dummy")
dag_model = dag_maker.dag_model
for index in range(2):
dag_maker.create_dagrun(
run_id=f"run_{index}",
execution_date=(DEFAULT_DATE + timedelta(days=index)),
start_date=timezone.utcnow(),
state=State.RUNNING,
session=session,
)
session.flush()
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
assert excepted is self.job_runner._should_update_dag_next_dagruns(
dag, dag_model, total_active_runs=number_running, session=session
)
@pytest.mark.parametrize(
"run_type, should_update",
[
(DagRunType.MANUAL, False),
(DagRunType.SCHEDULED, True),
(DagRunType.BACKFILL_JOB, True),
(DagRunType.DATASET_TRIGGERED, False),
],
ids=[
DagRunType.MANUAL.name,
DagRunType.SCHEDULED.name,
DagRunType.BACKFILL_JOB.name,
DagRunType.DATASET_TRIGGERED.name,
],
)
def test_should_update_dag_next_dagruns_after_run_type(self, run_type, should_update, session, dag_maker):
"""Test that whether next dagrun is updated depends on run type"""
with dag_maker(
dag_id="test_should_update_dag_next_dagruns_after_run_type",
schedule="*/1 * * * *",
max_active_runs=10,
) as dag:
EmptyOperator(task_id="dummy")
dag_model = dag_maker.dag_model
run = dag_maker.create_dagrun(
run_id="run",
run_type=run_type,
execution_date=DEFAULT_DATE,
start_date=timezone.utcnow(),
state=State.SUCCESS,
session=session,
)
session.flush()
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
assert should_update is self.job_runner._should_update_dag_next_dagruns(
dag, dag_model, last_dag_run=run, total_active_runs=0, session=session
)
def test_create_dag_runs(self, dag_maker):
"""
Test various invariants of _create_dag_runs.
- That the run created has the creating_job_id set
- That the run created is on QUEUED State
- That dag_model has next_dagrun
"""
with dag_maker(dag_id="test_create_dag_runs") as dag:
EmptyOperator(task_id="dummy")
dag_model = dag_maker.dag_model
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
with create_session() as session:
self.job_runner._create_dag_runs([dag_model], session)
dr = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).first()
# Assert dr state is queued
assert dr.state == State.QUEUED
assert dr.start_date is None
assert dag.get_last_dagrun().creating_job_id == scheduler_job.id
@pytest.mark.need_serialized_dag
def test_create_dag_runs_datasets(self, session, dag_maker):
"""
Test various invariants of _create_dag_runs.
- That the run created has the creating_job_id set
- That the run created is on QUEUED State
- That dag_model has next_dagrun
"""
dataset1 = Dataset(uri="ds1")
dataset2 = Dataset(uri="ds2")
with dag_maker(dag_id="datasets-1", start_date=timezone.utcnow(), session=session):
BashOperator(task_id="task", bash_command="echo 1", outlets=[dataset1])
dr = dag_maker.create_dagrun(
run_id="run1",
execution_date=(DEFAULT_DATE + timedelta(days=100)),
data_interval=(DEFAULT_DATE + timedelta(days=10), DEFAULT_DATE + timedelta(days=11)),
)
ds1_id = session.query(DatasetModel.id).filter_by(uri=dataset1.uri).scalar()
event1 = DatasetEvent(
dataset_id=ds1_id,
source_task_id="task",
source_dag_id=dr.dag_id,
source_run_id=dr.run_id,
source_map_index=-1,
)
session.add(event1)
# Create a second event, creation time is more recent, but data interval is older
dr = dag_maker.create_dagrun(
run_id="run2",
execution_date=(DEFAULT_DATE + timedelta(days=101)),
data_interval=(DEFAULT_DATE + timedelta(days=5), DEFAULT_DATE + timedelta(days=6)),
)
event2 = DatasetEvent(
dataset_id=ds1_id,
source_task_id="task",
source_dag_id=dr.dag_id,
source_run_id=dr.run_id,
source_map_index=-1,
)
session.add(event2)
with dag_maker(dag_id="datasets-consumer-multiple", schedule=[dataset1, dataset2]):
pass
dag2 = dag_maker.dag
with dag_maker(dag_id="datasets-consumer-single", schedule=[dataset1]):
pass
dag3 = dag_maker.dag
session = dag_maker.session
session.add_all(
[
DatasetDagRunQueue(dataset_id=ds1_id, target_dag_id=dag2.dag_id),
DatasetDagRunQueue(dataset_id=ds1_id, target_dag_id=dag3.dag_id),
]
)
session.flush()
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
with create_session() as session:
self.job_runner._create_dagruns_for_dags(session, session)
def dict_from_obj(obj):
"""Get dict of column attrs from SqlAlchemy object."""
return {k.key: obj.__dict__.get(k) for k in obj.__mapper__.column_attrs}
# dag3 should be triggered since it only depends on dataset1, and it's been queued
created_run = session.query(DagRun).filter(DagRun.dag_id == dag3.dag_id).one()
assert created_run.state == State.QUEUED
assert created_run.start_date is None
# we don't have __eq__ defined on DatasetEvent because... given the fact that in the future
# we may register events from other systems, dataset_id + timestamp might not be enough PK
assert list(map(dict_from_obj, created_run.consumed_dataset_events)) == list(
map(dict_from_obj, [event1, event2])
)
assert created_run.data_interval_start == DEFAULT_DATE + timedelta(days=5)
assert created_run.data_interval_end == DEFAULT_DATE + timedelta(days=11)
# dag2 DDRQ record should still be there since the dag run was *not* triggered
assert session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag2.dag_id).one() is not None
# dag2 should not be triggered since it depends on both dataset 1 and 2
assert session.query(DagRun).filter(DagRun.dag_id == dag2.dag_id).one_or_none() is None
# dag3 DDRQ record should be deleted since the dag run was triggered
assert session.query(DatasetDagRunQueue).filter_by(target_dag_id=dag3.dag_id).one_or_none() is None
assert dag3.get_last_dagrun().creating_job_id == scheduler_job.id
@pytest.mark.need_serialized_dag
@pytest.mark.parametrize(
"disable, enable",
[
pytest.param({"is_active": False}, {"is_active": True}, id="active"),
pytest.param({"is_paused": True}, {"is_paused": False}, id="paused"),
],
)
def test_no_create_dag_runs_when_dag_disabled(self, session, dag_maker, disable, enable):
ds = Dataset("ds")
with dag_maker(dag_id="consumer", schedule=[ds], session=session):
pass
with dag_maker(dag_id="producer", schedule="@daily", session=session):
BashOperator(task_id="task", bash_command="echo 1", outlets=ds)
dsm = DatasetManager()
ds_id = session.scalars(select(DatasetModel.id).filter_by(uri=ds.uri)).one()
dse_q = select(DatasetEvent).where(DatasetEvent.dataset_id == ds_id).order_by(DatasetEvent.timestamp)
ddrq_q = select(DatasetDagRunQueue).where(
DatasetDagRunQueue.dataset_id == ds_id, DatasetDagRunQueue.target_dag_id == "consumer"
)
# Simulate the consumer DAG being disabled.
session.execute(update(DagModel).where(DagModel.dag_id == "consumer").values(**disable))
# A DDRQ is not scheduled although an event is emitted.
dr1: DagRun = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
dsm.register_dataset_change(
task_instance=dr1.get_task_instance("task", session=session),
dataset=ds,
session=session,
)
assert session.scalars(dse_q).one().source_run_id == dr1.run_id
assert session.scalars(ddrq_q).one_or_none() is None
# Simulate the consumer DAG being enabled.
session.execute(update(DagModel).where(DagModel.dag_id == "consumer").values(**enable))
# A DDRQ should be scheduled for the new event, but not the previous one.
dr2: DagRun = dag_maker.create_dagrun_after(dr1, run_type=DagRunType.SCHEDULED)
dsm.register_dataset_change(
task_instance=dr2.get_task_instance("task", session=session),
dataset=ds,
session=session,
)
assert [e.source_run_id for e in session.scalars(dse_q)] == [dr1.run_id, dr2.run_id]
assert session.scalars(ddrq_q).one().target_dag_id == "consumer"
@time_machine.travel(DEFAULT_DATE + datetime.timedelta(days=1, seconds=9), tick=False)
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.timing")
def test_start_dagruns(self, stats_timing, dag_maker):
"""
Test that _start_dagrun:
- moves runs to RUNNING State
- emit the right DagRun metrics
"""
with dag_maker(dag_id="test_start_dag_runs") as dag:
EmptyOperator(
task_id="dummy",
)
dag_model = dag_maker.dag_model
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
with create_session() as session:
self.job_runner._create_dag_runs([dag_model], session)
self.job_runner._start_queued_dagruns(session)
dr = session.query(DagRun).filter(DagRun.dag_id == dag.dag_id).first()
# Assert dr state is running
assert dr.state == State.RUNNING
stats_timing.assert_has_calls(
[
mock.call(
"dagrun.schedule_delay.test_start_dag_runs",
datetime.timedelta(seconds=9),
),
mock.call(
"dagrun.schedule_delay",
datetime.timedelta(seconds=9),
tags={"dag_id": "test_start_dag_runs"},
),
]
)
assert dag.get_last_dagrun().creating_job_id == scheduler_job.id
def test_extra_operator_links_not_loaded_in_scheduler_loop(self, dag_maker):
"""
Test that Operator links are not loaded inside the Scheduling Loop (that does not include
DagFileProcessorProcess) especially the critical loop of the Scheduler.
This is to avoid running User code in the Scheduler and prevent any deadlocks
"""
with dag_maker(dag_id="test_extra_operator_links_not_loaded_in_scheduler") as dag:
# This CustomOperator has Extra Operator Links registered via plugins
_ = CustomOperator(task_id="custom_task")
custom_task = dag.task_dict["custom_task"]
# Test that custom_task has >= 1 Operator Links (after de-serialization)
assert custom_task.operator_extra_links
session = settings.Session()
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
self.job_runner._start_queued_dagruns(session)
session.flush()
# Get serialized dag
s_dag_2 = self.job_runner.dagbag.get_dag(dag.dag_id)
custom_task = s_dag_2.task_dict["custom_task"]
# Test that custom_task has no Operator Links (after de-serialization) in the Scheduling Loop
assert not custom_task.operator_extra_links
def test_scheduler_create_dag_runs_does_not_raise_error(self, caplog, dag_maker):
"""
Test that scheduler._create_dag_runs does not raise an error when the DAG does not exist
in serialized_dag table
"""
with dag_maker(dag_id="test_scheduler_create_dag_runs_does_not_raise_error", serialized=False):
EmptyOperator(
task_id="dummy",
)
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
caplog.set_level("FATAL")
caplog.clear()
with create_session() as session, caplog.at_level(
"ERROR",
logger="airflow.jobs.scheduler_job_runner",
):
self.job_runner._create_dag_runs([dag_maker.dag_model], session)
assert caplog.messages == [
"DAG 'test_scheduler_create_dag_runs_does_not_raise_error' not found in serialized_dag table",
]
def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self, dag_maker):
"""
Test that externally triggered Dag Runs should not affect (by skipping) next
scheduled DAG runs
"""
with dag_maker(
dag_id="test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run",
schedule="*/1 * * * *",
max_active_runs=5,
catchup=True,
) as dag:
EmptyOperator(task_id="dummy")
session = settings.Session()
# Verify that dag_model.next_dagrun is equal to next execution_date
dag_model = dag_maker.dag_model
assert dag_model.next_dagrun == DEFAULT_DATE
assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE
assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE + timedelta(minutes=1)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor(do_update=False)
self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
# Verify a DagRun is created with the correct dates
# when Scheduler._do_scheduling is run in the Scheduler Loop
self.job_runner._do_scheduling(session)
dr1 = dag.get_dagrun(DEFAULT_DATE, session=session)
assert dr1 is not None
assert dr1.state == State.RUNNING
assert dr1.execution_date == DEFAULT_DATE
assert dr1.data_interval_start == DEFAULT_DATE
assert dr1.data_interval_end == DEFAULT_DATE + timedelta(minutes=1)
# Verify that dag_model.next_dagrun is set to next interval
dag_model = session.get(DagModel, dag.dag_id)
assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)
assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE + timedelta(minutes=1)
assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE + timedelta(minutes=2)
# Trigger the Dag externally
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dr = dag.create_dagrun(
state=State.RUNNING,
execution_date=timezone.utcnow(),
run_type=DagRunType.MANUAL,
session=session,
external_trigger=True,
data_interval=data_interval,
)
assert dr is not None
# Run DAG.bulk_write_to_db -- this is run when in DagFileProcessor.process_file
DAG.bulk_write_to_db([dag], session=session)
# Test that 'dag_model.next_dagrun' has not been changed because of newly created external
# triggered DagRun.
dag_model = session.get(DagModel, dag.dag_id)
assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(minutes=1)
assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE + timedelta(minutes=1)
assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE + timedelta(minutes=2)
def test_scheduler_create_dag_runs_check_existing_run(self, dag_maker):
"""
Test that if a dag run exists, scheduler._create_dag_runs does not raise an error.
And if a Dag Run does not exist it creates next Dag Run. In both cases the Scheduler
sets next execution date as DagModel.next_dagrun
"""
with dag_maker(
dag_id="test_scheduler_create_dag_runs_check_existing_run",
schedule=timedelta(days=1),
) as dag:
EmptyOperator(
task_id="dummy",
)
session = settings.Session()
assert dag.get_last_dagrun(session) is None
dag_model = dag_maker.dag_model
# Assert dag_model.next_dagrun is set correctly
assert dag_model.next_dagrun == DEFAULT_DATE
dagrun = dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=dag_model.next_dagrun,
start_date=timezone.utcnow(),
state=State.RUNNING,
external_trigger=False,
session=session,
creating_job_id=2,
)
session.flush()
assert dag.get_last_dagrun(session) == dagrun
scheduler_job = Job(executor=self.null_exec)
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
# Test that this does not raise any error
self.job_runner._create_dag_runs([dag_model], session)
# Assert the next dagrun fields are set correctly to next execution date
assert dag_model.next_dagrun_data_interval_start == DEFAULT_DATE + timedelta(days=1)
assert dag_model.next_dagrun_data_interval_end == DEFAULT_DATE + timedelta(days=2)
assert dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1)
session.rollback()
@conf_vars({("scheduler", "use_job_schedule"): "false"})
def test_do_schedule_max_active_runs_dag_timed_out(self, dag_maker):
"""Test that tasks are set to a finished state when their DAG times out"""
with dag_maker(
dag_id="test_max_active_run_with_dag_timed_out",
schedule="@once",
max_active_runs=1,
catchup=True,
dagrun_timeout=datetime.timedelta(seconds=1),
) as dag:
task1 = BashOperator(
task_id="task1",
bash_command=' for((i=1;i<=600;i+=1)); do sleep "$i"; done',
)
session = settings.Session()
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
run1 = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE,
state=State.RUNNING,
start_date=timezone.utcnow() - timedelta(seconds=2),
session=session,
data_interval=data_interval,
)
run1_ti = run1.get_task_instance(task1.task_id, session)
run1_ti.state = State.RUNNING
run2 = dag.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + timedelta(seconds=10),
state=State.QUEUED,
session=session,
data_interval=data_interval,
)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
my_dag = session.get(DagModel, dag.dag_id)
self.job_runner._create_dag_runs([my_dag], session)
# Run relevant part of scheduling again to assert run2 has been scheduled
self.job_runner._schedule_dag_run(run1, session)
run1 = session.merge(run1)
session.refresh(run1)
assert run1.state == State.FAILED
assert run1_ti.state == State.SKIPPED
session.flush()
# Run relevant part of scheduling again to assert run2 has been scheduled
self.job_runner._start_queued_dagruns(session)
session.flush()
run2 = session.merge(run2)
session.refresh(run2)
assert run2.state == State.RUNNING
self.job_runner._schedule_dag_run(run2, session)
run2_ti = run2.get_task_instance(task1.task_id, session)
assert run2_ti.state == State.SCHEDULED
def test_do_schedule_max_active_runs_task_removed(self, session, dag_maker):
"""Test that tasks in removed state don't count as actively running."""
with dag_maker(
dag_id="test_do_schedule_max_active_runs_task_removed",
start_date=DEFAULT_DATE,
schedule="@once",
max_active_runs=1,
session=session,
):
# Can't use EmptyOperator as that goes straight to success
BashOperator(task_id="dummy1", bash_command="true")
run1 = dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
execution_date=DEFAULT_DATE + timedelta(hours=1),
state=State.RUNNING,
)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor(do_update=False)
self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
num_queued = self.job_runner._do_scheduling(session)
assert num_queued == 1
session.flush()
ti = run1.task_instances[0]
ti.refresh_from_db(session=session)
assert ti.state == State.QUEUED
def test_more_runs_are_not_created_when_max_active_runs_is_reached(self, dag_maker, caplog):
"""
This tests that when max_active_runs is reached, _create_dag_runs doesn't create
more dagruns
"""
with dag_maker(max_active_runs=1):
EmptyOperator(task_id="task")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor(do_update=False)
self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
session = settings.Session()
assert session.query(DagRun).count() == 0
query, _ = DagModel.dags_needing_dagruns(session)
dag_models = query.all()
self.job_runner._create_dag_runs(dag_models, session)
dr = session.query(DagRun).one()
dr.state == DagRunState.QUEUED
assert session.query(DagRun).count() == 1
assert dag_maker.dag_model.next_dagrun_create_after is None
session.flush()
# dags_needing_dagruns query should not return any value
query, _ = DagModel.dags_needing_dagruns(session)
assert len(query.all()) == 0
self.job_runner._create_dag_runs(dag_models, session)
assert session.query(DagRun).count() == 1
assert dag_maker.dag_model.next_dagrun_create_after is None
assert dag_maker.dag_model.next_dagrun == DEFAULT_DATE
# set dagrun to success
dr = session.query(DagRun).one()
dr.state = DagRunState.SUCCESS
ti = dr.get_task_instance("task", session)
ti.state = TaskInstanceState.SUCCESS
session.merge(ti)
session.merge(dr)
session.flush()
# check that next_dagrun is set properly by Schedulerjob._update_dag_next_dagruns
self.job_runner._schedule_dag_run(dr, session)
session.flush()
query, _ = DagModel.dags_needing_dagruns(session)
assert len(query.all()) == 1
# assert next_dagrun has been updated correctly
assert dag_maker.dag_model.next_dagrun == DEFAULT_DATE + timedelta(days=1)
# assert no dagruns is created yet
assert (
session.query(DagRun).filter(DagRun.state.in_([DagRunState.RUNNING, DagRunState.QUEUED])).count()
== 0
)
def test_max_active_runs_creation_phasing(self, dag_maker, session):
"""
Test that when creating runs once max_active_runs is reached that the runs come in the right order
without gaps
"""
def complete_one_dagrun():
ti = (
session.query(TaskInstance)
.join(TaskInstance.dag_run)
.filter(TaskInstance.state != State.SUCCESS)
.order_by(DagRun.execution_date)
.first()
)
if ti:
ti.state = State.SUCCESS
session.flush()
self.clean_db()
with dag_maker(max_active_runs=3, session=session) as dag:
# Need to use something that doesn't immediately get marked as success by the scheduler
BashOperator(task_id="task", bash_command="true")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor(do_update=True)
self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
query, _ = DagModel.dags_needing_dagruns(session)
query.all()
for _ in range(3):
self.job_runner._do_scheduling(session)
model: DagModel = session.get(DagModel, dag.dag_id)
# Pre-condition
assert DagRun.active_runs_of_dags(session=session) == {"test_dag": 3}
assert model.next_dagrun == timezone.DateTime(2016, 1, 3, tzinfo=UTC)
assert model.next_dagrun_create_after is None
complete_one_dagrun()
assert DagRun.active_runs_of_dags(session=session) == {"test_dag": 3}
for _ in range(5):
self.job_runner._do_scheduling(session)
complete_one_dagrun()
expected_execution_dates = [datetime.datetime(2016, 1, d, tzinfo=timezone.utc) for d in range(1, 6)]
dagrun_execution_dates = [
dr.execution_date for dr in session.query(DagRun).order_by(DagRun.execution_date).all()
]
assert dagrun_execution_dates == expected_execution_dates
def test_do_schedule_max_active_runs_and_manual_trigger(self, dag_maker):
"""
Make sure that when a DAG is already at max_active_runs, that manually triggered
dagruns don't start running.
"""
with dag_maker(
dag_id="test_max_active_run_plus_manual_trigger",
schedule="@once",
max_active_runs=1,
) as dag:
# Can't use EmptyOperator as that goes straight to success
task1 = BashOperator(task_id="dummy1", bash_command="true")
task2 = BashOperator(task_id="dummy2", bash_command="true")
task1 >> task2
BashOperator(task_id="dummy3", bash_command="true")
session = settings.Session()
dag_run = dag_maker.create_dagrun(
state=State.QUEUED,
session=session,
)
dag.sync_to_db(session=session) # Update the date fields
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor(do_update=False)
self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
num_queued = self.job_runner._do_scheduling(session)
# Add it back in to the session so we can refresh it. (_do_scheduling does an expunge_all to reduce
# memory)
dag_run = session.merge(dag_run)
session.refresh(dag_run)
assert num_queued == 2
assert dag_run.state == State.RUNNING
# Now that this one is running, manually trigger a dag.
dag_maker.create_dagrun(
run_type=DagRunType.MANUAL,
execution_date=DEFAULT_DATE + timedelta(hours=1),
state=State.QUEUED,
session=session,
)
session.flush()
self.job_runner._do_scheduling(session)
# Assert that only 1 dagrun is active
assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 1
# Assert that the other one is queued
assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 1
def test_max_active_runs_in_a_dag_doesnt_stop_running_dagruns_in_otherdags(self, dag_maker):
session = settings.Session()
with dag_maker(
"test_dag1",
start_date=DEFAULT_DATE,
schedule=timedelta(hours=1),
max_active_runs=1,
):
EmptyOperator(task_id="mytask")
dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.QUEUED)
for _ in range(29):
dr = dag_maker.create_dagrun_after(dr, run_type=DagRunType.SCHEDULED, state=State.QUEUED)
with dag_maker(
"test_dag2",
start_date=timezone.datetime(2020, 1, 1),
schedule=timedelta(hours=1),
):
EmptyOperator(task_id="mytask")
dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.QUEUED)
for _ in range(9):
dr = dag_maker.create_dagrun_after(dr, run_type=DagRunType.SCHEDULED, state=State.QUEUED)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor(do_update=False)
self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
self.job_runner._start_queued_dagruns(session)
session.flush()
self.job_runner._start_queued_dagruns(session)
session.flush()
dag1_running_count = (
session.query(func.count(DagRun.id))
.filter(DagRun.dag_id == "test_dag1", DagRun.state == State.RUNNING)
.scalar()
)
running_count = session.query(func.count(DagRun.id)).filter(DagRun.state == State.RUNNING).scalar()
assert dag1_running_count == 1
assert running_count == 11
def test_start_queued_dagruns_do_follow_execution_date_order(self, dag_maker):
session = settings.Session()
with dag_maker("test_dag1", max_active_runs=1):
EmptyOperator(task_id="mytask")
date = DEFAULT_DATE
for i in range(30):
dr = dag_maker.create_dagrun(
run_id=f"dagrun_{i}", run_type=DagRunType.SCHEDULED, state=State.QUEUED, execution_date=date
)
date = dr.execution_date + timedelta(hours=1)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor(do_update=False)
self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
self.job_runner._start_queued_dagruns(session)
session.flush()
dr = DagRun.find(run_id="dagrun_0")
ti = dr[0].get_task_instance(task_id="mytask", session=session)
ti.state = State.SUCCESS
session.merge(ti)
session.commit()
assert dr[0].state == State.RUNNING
dr[0].state = State.SUCCESS
session.merge(dr[0])
session.flush()
assert dr[0].state == State.SUCCESS
self.job_runner._start_queued_dagruns(session)
session.flush()
dr = DagRun.find(run_id="dagrun_1")
assert len(session.query(DagRun).filter(DagRun.state == State.RUNNING).all()) == 1
assert dr[0].state == State.RUNNING
def test_no_dagruns_would_stuck_in_running(self, dag_maker):
# Test that running dagruns are not stuck in running.
# Create one dagrun in 'running' state and 1 in 'queued' state from one dag(max_active_runs=1)
# Create 16 dagruns in 'running' state and 16 in 'queued' state from another dag
# Create 16 dagruns in 'running' state and 16 in 'queued' state from yet another dag
# Finish the task of the first dag, and check that another dagrun starts running
# from the first dag.
session = settings.Session()
# first dag and dagruns
date = timezone.datetime(2016, 1, 1)
logical_date = timezone.coerce_datetime(date)
with dag_maker("test_dagrun_states_are_correct_1", max_active_runs=1, start_date=date) as dag:
task1 = EmptyOperator(task_id="dummy_task")
dr1_running = dag_maker.create_dagrun(run_id="dr1_run_1", execution_date=date)
data_interval = dag.infer_automated_data_interval(logical_date)
dag_maker.create_dagrun(
run_id="dr1_run_2",
state=State.QUEUED,
execution_date=dag.next_dagrun_info(
last_automated_dagrun=data_interval, restricted=False
).data_interval.start,
)
# second dag and dagruns
date = timezone.datetime(2020, 1, 1)
with dag_maker("test_dagrun_states_are_correct_2", start_date=date) as dag:
EmptyOperator(task_id="dummy_task")
for i in range(16):
dr = dag_maker.create_dagrun(run_id=f"dr2_run_{i+1}", state=State.RUNNING, execution_date=date)
date = dr.execution_date + timedelta(hours=1)
dr16 = DagRun.find(run_id="dr2_run_16")
date = dr16[0].execution_date + timedelta(hours=1)
for i in range(16, 32):
dr = dag_maker.create_dagrun(run_id=f"dr2_run_{i+1}", state=State.QUEUED, execution_date=date)
date = dr.execution_date + timedelta(hours=1)
# third dag and dagruns
date = timezone.datetime(2021, 1, 1)
with dag_maker("test_dagrun_states_are_correct_3", start_date=date) as dag:
EmptyOperator(task_id="dummy_task")
for i in range(16):
dr = dag_maker.create_dagrun(run_id=f"dr3_run_{i+1}", state=State.RUNNING, execution_date=date)
date = dr.execution_date + timedelta(hours=1)
dr16 = DagRun.find(run_id="dr3_run_16")
date = dr16[0].execution_date + timedelta(hours=1)
for i in range(16, 32):
dr = dag_maker.create_dagrun(run_id=f"dr2_run_{i+1}", state=State.QUEUED, execution_date=date)
date = dr.execution_date + timedelta(hours=1)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor(do_update=False)
self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
ti = TaskInstance(task=task1, run_id=dr1_running.run_id)
ti.refresh_from_db()
ti.state = State.SUCCESS
session.merge(ti)
session.flush()
# Run the scheduler loop
with mock.patch.object(settings, "USE_JOB_SCHEDULE", False):
self.job_runner._do_scheduling(session)
self.job_runner._do_scheduling(session)
assert DagRun.find(run_id="dr1_run_1")[0].state == State.SUCCESS
assert DagRun.find(run_id="dr1_run_2")[0].state == State.RUNNING
@pytest.mark.parametrize(
"state, start_date, end_date",
[
[State.NONE, None, None],
[
State.UP_FOR_RETRY,
timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15),
],
[
State.UP_FOR_RESCHEDULE,
timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15),
],
],
)
def test_dag_file_processor_process_task_instances(self, state, start_date, end_date, dag_maker):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
with dag_maker(dag_id="test_scheduler_process_execute_task"):
BashOperator(task_id="dummy", bash_command="echo hi")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED)
assert dr is not None
with create_session() as session:
ti = dr.get_task_instances(session=session)[0]
ti.state = state
ti.start_date = start_date
ti.end_date = end_date
self.job_runner._schedule_dag_run(dr, session)
assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 1
session.refresh(ti)
assert ti.state == State.SCHEDULED
@pytest.mark.parametrize(
"state,start_date,end_date",
[
[State.NONE, None, None],
[
State.UP_FOR_RETRY,
timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15),
],
[
State.UP_FOR_RESCHEDULE,
timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15),
],
],
)
def test_dag_file_processor_process_task_instances_with_max_active_tis_per_dag(
self, state, start_date, end_date, dag_maker
):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
with dag_maker(dag_id="test_scheduler_process_execute_task_with_max_active_tis_per_dag"):
BashOperator(task_id="dummy", max_active_tis_per_dag=2, bash_command="echo Hi")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
dr = dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
)
assert dr is not None
with create_session() as session:
ti = dr.get_task_instances(session=session)[0]
ti.state = state
ti.start_date = start_date
ti.end_date = end_date
self.job_runner._schedule_dag_run(dr, session)
assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 1
session.refresh(ti)
assert ti.state == State.SCHEDULED
@pytest.mark.parametrize(
"state,start_date,end_date",
[
[State.NONE, None, None],
[
State.UP_FOR_RETRY,
timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15),
],
[
State.UP_FOR_RESCHEDULE,
timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15),
],
],
)
def test_dag_file_processor_process_task_instances_with_max_active_tis_per_dagrun(
self, state, start_date, end_date, dag_maker
):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
with dag_maker(dag_id="test_scheduler_process_execute_task_with_max_active_tis_per_dagrun"):
BashOperator(task_id="dummy", max_active_tis_per_dagrun=2, bash_command="echo Hi")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
dr = dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
)
assert dr is not None
with create_session() as session:
ti = dr.get_task_instances(session=session)[0]
ti.state = state
ti.start_date = start_date
ti.end_date = end_date
self.job_runner._schedule_dag_run(dr, session)
assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 1
session.refresh(ti)
assert ti.state == State.SCHEDULED
@pytest.mark.parametrize(
"state, start_date, end_date",
[
[State.NONE, None, None],
[
State.UP_FOR_RETRY,
timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15),
],
[
State.UP_FOR_RESCHEDULE,
timezone.utcnow() - datetime.timedelta(minutes=30),
timezone.utcnow() - datetime.timedelta(minutes=15),
],
],
)
def test_dag_file_processor_process_task_instances_depends_on_past(
self, state, start_date, end_date, dag_maker
):
"""
Test if _process_task_instances puts the right task instances into the
mock_list.
"""
with dag_maker(
dag_id="test_scheduler_process_execute_task_depends_on_past",
default_args={
"depends_on_past": True,
},
):
BashOperator(task_id="dummy1", bash_command="echo hi")
BashOperator(task_id="dummy2", bash_command="echo hi")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
dr = dag_maker.create_dagrun(
run_type=DagRunType.SCHEDULED,
)
assert dr is not None
with create_session() as session:
tis = dr.get_task_instances(session=session)
for ti in tis:
ti.state = state
ti.start_date = start_date
ti.end_date = end_date
self.job_runner._schedule_dag_run(dr, session)
assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 2
session.refresh(tis[0])
session.refresh(tis[1])
assert tis[0].state == State.SCHEDULED
assert tis[1].state == State.SCHEDULED
def test_scheduler_job_add_new_task(self, dag_maker):
"""
Test if a task instance will be added if the dag is updated
"""
with dag_maker(dag_id="test_scheduler_add_new_task") as dag:
BashOperator(task_id="dummy", bash_command="echo test")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.dagbag = dag_maker.dagbag
session = settings.Session()
orm_dag = dag_maker.dag_model
assert orm_dag is not None
if self.job_runner.processor_agent:
self.job_runner.processor_agent.end()
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
dag = self.job_runner.dagbag.get_dag("test_scheduler_add_new_task", session=session)
self.job_runner._create_dag_runs([orm_dag], session)
drs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(drs) == 1
dr = drs[0]
tis = dr.get_task_instances()
assert len(tis) == 1
BashOperator(task_id="dummy2", dag=dag, bash_command="echo test")
SerializedDagModel.write_dag(dag=dag)
self.job_runner._schedule_dag_run(dr, session)
assert session.query(TaskInstance).filter_by(state=State.SCHEDULED).count() == 2
session.flush()
drs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(drs) == 1
dr = drs[0]
tis = dr.get_task_instances()
assert len(tis) == 2
def test_runs_respected_after_clear(self, dag_maker):
"""
Test dag after dag.clear, max_active_runs is respected
"""
with dag_maker(
dag_id="test_scheduler_max_active_runs_respected_after_clear",
start_date=DEFAULT_DATE,
max_active_runs=1,
) as dag:
BashOperator(task_id="dummy", bash_command="echo Hi")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
session = settings.Session()
dr = dag_maker.create_dagrun(run_type=DagRunType.SCHEDULED, state=State.QUEUED)
dr = dag_maker.create_dagrun_after(dr, run_type=DagRunType.SCHEDULED, state=State.QUEUED)
dag_maker.create_dagrun_after(dr, run_type=DagRunType.SCHEDULED, state=State.QUEUED)
dag.clear()
assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 3
session = settings.Session()
self.job_runner._start_queued_dagruns(session)
session.flush()
# Assert that only 1 dagrun is active
assert len(DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session)) == 1
# Assert that the other two are queued
assert len(DagRun.find(dag_id=dag.dag_id, state=State.QUEUED, session=session)) == 2
def test_timeout_triggers(self, dag_maker):
"""
Tests that tasks in the deferred state, but whose trigger timeout
has expired, are correctly failed.
"""
session = settings.Session()
# Create the test DAG and task
with dag_maker(
dag_id="test_timeout_triggers",
start_date=DEFAULT_DATE,
schedule="@once",
max_active_runs=1,
session=session,
):
EmptyOperator(task_id="dummy1")
# Create a Task Instance for the task that is allegedly deferred
# but past its timeout, and one that is still good.
# We don't actually need a linked trigger here; the code doesn't check.
dr1 = dag_maker.create_dagrun()
dr2 = dag_maker.create_dagrun(
run_id="test2", execution_date=DEFAULT_DATE + datetime.timedelta(seconds=1)
)
ti1 = dr1.get_task_instance("dummy1", session)
ti2 = dr2.get_task_instance("dummy1", session)
ti1.state = State.DEFERRED
ti1.trigger_timeout = timezone.utcnow() - datetime.timedelta(seconds=60)
ti2.state = State.DEFERRED
ti2.trigger_timeout = timezone.utcnow() + datetime.timedelta(seconds=60)
session.flush()
# Boot up the scheduler and make it check timeouts
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.check_trigger_timeouts(session=session)
# Make sure that TI1 is now scheduled to fail, and 2 wasn't touched
session.refresh(ti1)
session.refresh(ti2)
assert ti1.state == State.SCHEDULED
assert ti1.next_method == "__fail__"
assert ti2.state == State.DEFERRED
def test_find_zombies_nothing(self):
executor = MockExecutor(do_update=False)
scheduler_job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(scheduler_job)
self.job_runner.processor_agent = mock.MagicMock()
self.job_runner._find_zombies()
scheduler_job.executor.callback_sink.send.assert_not_called()
def test_find_zombies(self, load_examples):
dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
with create_session() as session:
session.query(Job).delete()
dag = dagbag.get_dag("example_branch_operator")
dag.sync_to_db()
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dag_run = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=data_interval,
)
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner.processor_agent = mock.MagicMock()
# We will provision 2 tasks so we can check we only find zombies from this scheduler
tasks_to_setup = ["branching", "run_this_first"]
for task_id in tasks_to_setup:
task = dag.get_task(task_id=task_id)
ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
ti.queued_by_job_id = 999
local_job = Job(dag_id=ti.dag_id)
LocalTaskJobRunner(job=local_job, task_instance=ti)
local_job.state = TaskInstanceState.FAILED
session.add(local_job)
session.flush()
ti.job_id = local_job.id
session.add(ti)
session.flush()
assert task.task_id == "run_this_first" # Make sure we have the task/ti we expect
ti.queued_by_job_id = scheduler_job.id
session.flush()
self.job_runner._find_zombies()
scheduler_job.executor.callback_sink.send.assert_called_once()
requests = scheduler_job.executor.callback_sink.send.call_args.args
assert 1 == len(requests)
assert requests[0].full_filepath == dag.fileloc
assert requests[0].msg == str(self.job_runner._generate_zombie_message_details(ti))
assert requests[0].is_failure_callback is True
assert isinstance(requests[0].simple_task_instance, SimpleTaskInstance)
assert ti.dag_id == requests[0].simple_task_instance.dag_id
assert ti.task_id == requests[0].simple_task_instance.task_id
assert ti.run_id == requests[0].simple_task_instance.run_id
assert ti.map_index == requests[0].simple_task_instance.map_index
with create_session() as session:
session.query(TaskInstance).delete()
session.query(Job).delete()
def test_zombie_message(self, load_examples):
"""
Check that the zombie message comes out as expected
"""
dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
with create_session() as session:
session.query(Job).delete()
dag = dagbag.get_dag("example_branch_operator")
dag.sync_to_db()
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dag_run = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=data_interval,
)
scheduler_job = Job(executor=MockExecutor())
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
# We will provision 2 tasks so we can check we only find zombies from this scheduler
tasks_to_setup = ["branching", "run_this_first"]
for task_id in tasks_to_setup:
task = dag.get_task(task_id=task_id)
ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
ti.queued_by_job_id = 999
local_job = Job(dag_id=ti.dag_id)
local_job.state = TaskInstanceState.FAILED
session.add(local_job)
session.flush()
ti.job_id = local_job.id
session.add(ti)
session.flush()
assert task.task_id == "run_this_first" # Make sure we have the task/ti we expect
ti.queued_by_job_id = scheduler_job.id
session.flush()
zombie_message = self.job_runner._generate_zombie_message_details(ti)
assert zombie_message == {
"DAG Id": "example_branch_operator",
"Task Id": "run_this_first",
"Run Id": "scheduled__2016-01-01T00:00:00+00:00",
}
ti.hostname = "10.10.10.10"
ti.map_index = 2
ti.external_executor_id = "abcdefg"
zombie_message = self.job_runner._generate_zombie_message_details(ti)
assert zombie_message == {
"DAG Id": "example_branch_operator",
"Task Id": "run_this_first",
"Run Id": "scheduled__2016-01-01T00:00:00+00:00",
"Hostname": "10.10.10.10",
"Map Index": 2,
"External Executor Id": "abcdefg",
}
def test_find_zombies_handle_failure_callbacks_are_correctly_passed_to_dag_processor(self):
"""
Check that the same set of failure callback with zombies are passed to the dag
file processors until the next zombie detection logic is invoked.
"""
with conf_vars({("core", "load_examples"): "False"}), create_session() as session:
dagbag = DagBag(
dag_folder=os.path.join(settings.DAGS_FOLDER, "test_example_bash_operator.py"),
read_dags_from_db=False,
)
session.query(Job).delete()
dag = dagbag.get_dag("test_example_bash_operator")
dag.sync_to_db(processor_subdir=TEST_DAG_FOLDER)
data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
dag_run = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=DEFAULT_DATE,
run_type=DagRunType.SCHEDULED,
session=session,
data_interval=data_interval,
)
task = dag.get_task(task_id="run_this_last")
ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
local_job = Job(dag_id=ti.dag_id)
LocalTaskJobRunner(job=local_job, task_instance=ti)
local_job.state = JobState.FAILED
session.add(local_job)
session.flush()
# TODO: If there was an actual Relationship between TI and Job
# we wouldn't need this extra commit
session.add(ti)
ti.job_id = local_job.id
session.flush()
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner.processor_agent = mock.MagicMock()
self.job_runner._find_zombies()
scheduler_job.executor.callback_sink.send.assert_called_once()
expected_failure_callback_requests = [
TaskCallbackRequest(
full_filepath=dag.fileloc,
simple_task_instance=SimpleTaskInstance.from_ti(ti),
processor_subdir=TEST_DAG_FOLDER,
msg=str(self.job_runner._generate_zombie_message_details(ti)),
)
]
callback_requests = scheduler_job.executor.callback_sink.send.call_args.args
assert len(callback_requests) == 1
assert {zombie.simple_task_instance.key for zombie in expected_failure_callback_requests} == {
result.simple_task_instance.key for result in callback_requests
}
expected_failure_callback_requests[0].simple_task_instance = None
callback_requests[0].simple_task_instance = None
assert expected_failure_callback_requests[0] == callback_requests[0]
def test_cleanup_stale_dags(self):
dagbag = DagBag(TEST_DAG_FOLDER, read_dags_from_db=False)
with create_session() as session:
dag = dagbag.get_dag("test_example_bash_operator")
dag.sync_to_db()
dm = DagModel.get_current("test_example_bash_operator")
# Make it "stale".
dm.last_parsed_time = timezone.utcnow() - timedelta(minutes=11)
session.merge(dm)
# This one should remain active.
dag = dagbag.get_dag("test_start_date_scheduling")
dag.sync_to_db()
session.flush()
scheduler_job = Job(executor=MockExecutor())
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 2
self.job_runner._cleanup_stale_dags(session)
session.flush()
active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 1
@mock.patch.object(settings, "USE_JOB_SCHEDULE", False)
def run_scheduler_until_dagrun_terminal(self):
"""
Run a scheduler until any dag run reaches a terminal state, or the scheduler becomes "idle".
This needs a DagRun to be pre-created (it can be in running or queued state) as no more will be
created as we turn off creating new DagRuns via setting USE_JOB_SCHEDULE to false
Note: This doesn't currently account for tasks that go into retry -- the scheduler would be detected
as idle in that circumstance
"""
# Spy on _do_scheduling and _process_executor_events so we can notice
# if nothing happened, and abort early! Given we are using
# SequentialExecutor this shouldn't be possible -- if there is nothing
# to schedule and no events, it means we have stalled.
def spy_on_return(orig, result):
def spy(*args, **kwargs):
ret = orig(*args, **kwargs)
result.append(ret)
return ret
return spy
num_queued_tis: deque[int] = deque([], 3)
num_finished_events: deque[int] = deque([], 3)
do_scheduling_spy = mock.patch.object(
self.job_runner,
"_do_scheduling",
side_effect=spy_on_return(self.job_runner._do_scheduling, num_queued_tis),
)
executor_events_spy = mock.patch.object(
self.job_runner,
"_process_executor_events",
side_effect=spy_on_return(self.job_runner._process_executor_events, num_finished_events),
)
orig_set_state = DagRun.set_state
def watch_set_state(dr: DagRun, state, **kwargs):
if state in (DagRunState.SUCCESS, DagRunState.FAILED):
# Stop the scheduler
self.job_runner.num_runs = 1 # type: ignore[attr-defined]
orig_set_state(dr, state, **kwargs) # type: ignore[call-arg]
def watch_heartbeat(*args, **kwargs):
if len(num_queued_tis) < 3 or len(num_finished_events) < 3:
return
queued_any_tis = any(val > 0 for val in num_queued_tis)
finished_any_events = any(val > 0 for val in num_finished_events)
assert (
queued_any_tis or finished_any_events
), "Scheduler has stalled without setting the DagRun state!"
set_state_spy = mock.patch.object(DagRun, "set_state", new=watch_set_state)
heartbeat_spy = mock.patch.object(self.job_runner.job, "heartbeat", new=watch_heartbeat)
# with heartbeat_spy, set_state_spy, do_scheduling_spy, executor_events_spy:
with heartbeat_spy, set_state_spy, do_scheduling_spy, executor_events_spy:
run_job(self.job_runner.job, execute_callable=self.job_runner._execute)
@pytest.mark.long_running
@pytest.mark.parametrize("dag_id", ["test_mapped_classic", "test_mapped_taskflow"])
def test_mapped_dag(self, dag_id, session):
"""End-to-end test of a simple mapped dag"""
# Use SequentialExecutor for more predictable test behaviour
from airflow.executors.sequential_executor import SequentialExecutor
self.dagbag.process_file(str(TEST_DAGS_FOLDER / f"{dag_id}.py"))
dag = self.dagbag.get_dag(dag_id)
assert dag
logical_date = timezone.coerce_datetime(timezone.utcnow() - datetime.timedelta(days=2))
data_interval = dag.infer_automated_data_interval(logical_date)
dr = dag.create_dagrun(
run_id=f"{dag_id}_1",
run_type=DagRunType.MANUAL,
start_date=timezone.utcnow(),
state=State.RUNNING,
session=session,
data_interval=data_interval,
)
executor = SequentialExecutor()
job = Job(executor=executor)
self.job_runner = SchedulerJobRunner(job=job, subdir=dag.fileloc)
self.run_scheduler_until_dagrun_terminal()
dr.refresh_from_db(session)
assert dr.state == DagRunState.SUCCESS
def test_should_mark_empty_task_as_success(self):
dag_file = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "../dags/test_only_empty_tasks.py"
)
# Write DAGs to dag and serialized_dag table
dagbag = DagBag(dag_folder=dag_file, include_examples=False, read_dags_from_db=False)
dagbag.sync_to_db()
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner.processor_agent = mock.MagicMock()
dag = self.job_runner.dagbag.get_dag("test_only_empty_tasks")
# Create DagRun
session = settings.Session()
orm_dag = session.get(DagModel, dag.dag_id)
self.job_runner._create_dag_runs([orm_dag], session)
drs = DagRun.find(dag_id=dag.dag_id, session=session)
assert len(drs) == 1
dr = drs[0]
# Schedule TaskInstances
self.job_runner._schedule_dag_run(dr, session)
with create_session() as session:
tis = session.query(TaskInstance).all()
dags = self.job_runner.dagbag.dags.values()
assert ["test_only_empty_tasks"] == [dag.dag_id for dag in dags]
assert 6 == len(tis)
assert {
("test_task_a", "success"),
("test_task_b", None),
("test_task_c", "success"),
("test_task_on_execute", "scheduled"),
("test_task_on_success", "scheduled"),
("test_task_outlets", "scheduled"),
} == {(ti.task_id, ti.state) for ti in tis}
for state, start_date, end_date, duration in [
(ti.state, ti.start_date, ti.end_date, ti.duration) for ti in tis
]:
if state == "success":
assert start_date is not None
assert end_date is not None
assert 0.0 == duration
else:
assert start_date is None
assert end_date is None
assert duration is None
self.job_runner._schedule_dag_run(dr, session)
with create_session() as session:
tis = session.query(TaskInstance).all()
assert 6 == len(tis)
assert {
("test_task_a", "success"),
("test_task_b", "success"),
("test_task_c", "success"),
("test_task_on_execute", "scheduled"),
("test_task_on_success", "scheduled"),
("test_task_outlets", "scheduled"),
} == {(ti.task_id, ti.state) for ti in tis}
for state, start_date, end_date, duration in [
(ti.state, ti.start_date, ti.end_date, ti.duration) for ti in tis
]:
if state == "success":
assert start_date is not None
assert end_date is not None
assert 0.0 == duration
else:
assert start_date is None
assert end_date is None
assert duration is None
@pytest.mark.need_serialized_dag
def test_catchup_works_correctly(self, dag_maker):
"""Test that catchup works correctly"""
session = settings.Session()
with dag_maker(
dag_id="test_catchup_schedule_dag",
schedule=timedelta(days=1),
start_date=DEFAULT_DATE,
catchup=True,
max_active_runs=1,
session=session,
) as dag:
EmptyOperator(task_id="dummy")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner.processor_agent = mock.MagicMock(spec=DagFileProcessorAgent)
self.job_runner._create_dag_runs([dag_maker.dag_model], session)
self.job_runner._start_queued_dagruns(session)
# first dagrun execution date is DEFAULT_DATE 2016-01-01T00:00:00+00:00
dr = DagRun.find(execution_date=DEFAULT_DATE, session=session)[0]
ti = dr.get_task_instance(task_id="dummy")
ti.state = State.SUCCESS
session.merge(ti)
session.flush()
self.job_runner._schedule_dag_run(dr, session)
session.flush()
# Run the second time so _update_dag_next_dagrun will run
self.job_runner._schedule_dag_run(dr, session)
session.flush()
dag.catchup = False
dag.sync_to_db()
assert not dag.catchup
dm = DagModel.get_dagmodel(dag.dag_id)
self.job_runner._create_dag_runs([dm], session)
# Check catchup worked correctly by ensuring execution_date is quite new
# Our dag is a daily dag
assert (
session.query(DagRun.execution_date)
.filter(DagRun.execution_date != DEFAULT_DATE) # exclude the first run
.scalar()
) > (timezone.utcnow() - timedelta(days=2))
def test_update_dagrun_state_for_paused_dag(self, dag_maker, session):
"""Test that _update_dagrun_state_for_paused_dag puts DagRuns in terminal states"""
with dag_maker("testdag") as dag:
EmptyOperator(task_id="task1")
scheduled_run = dag_maker.create_dagrun(
execution_date=datetime.datetime(2022, 1, 1),
run_type=DagRunType.SCHEDULED,
)
scheduled_run.last_scheduling_decision = datetime.datetime.now(timezone.utc) - timedelta(minutes=1)
ti = scheduled_run.get_task_instances()[0]
ti.set_state(TaskInstanceState.RUNNING)
dm = DagModel.get_dagmodel(dag.dag_id)
dm.is_paused = True
session.merge(dm)
session.merge(ti)
session.flush()
assert scheduled_run.state == State.RUNNING
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner._update_dag_run_state_for_paused_dags(session=session)
session.flush()
# TI still running, DagRun left in running
(scheduled_run,) = DagRun.find(dag_id=dag.dag_id, run_type=DagRunType.SCHEDULED, session=session)
assert scheduled_run.state == State.RUNNING
prior_last_scheduling_decision = scheduled_run.last_scheduling_decision
# Make sure we don't constantly try dagruns over and over
self.job_runner._update_dag_run_state_for_paused_dags(session=session)
(scheduled_run,) = DagRun.find(dag_id=dag.dag_id, run_type=DagRunType.SCHEDULED, session=session)
assert scheduled_run.state == State.RUNNING
# last_scheduling_decision is bumped by update_state, so check that to determine if we tried again
assert prior_last_scheduling_decision == scheduled_run.last_scheduling_decision
# Once the TI is in a terminal state though, DagRun goes to success
ti.set_state(TaskInstanceState.SUCCESS)
self.job_runner._update_dag_run_state_for_paused_dags(session=session)
(scheduled_run,) = DagRun.find(dag_id=dag.dag_id, run_type=DagRunType.SCHEDULED, session=session)
assert scheduled_run.state == State.SUCCESS
def test_update_dagrun_state_for_paused_dag_not_for_backfill(self, dag_maker, session):
"""Test that the _update_dagrun_state_for_paused_dag does not affect backfilled dagruns"""
with dag_maker("testdag") as dag:
EmptyOperator(task_id="task1")
# Backfill run
backfill_run = dag_maker.create_dagrun(run_type=DagRunType.BACKFILL_JOB)
backfill_run.last_scheduling_decision = datetime.datetime.now(timezone.utc) - timedelta(minutes=1)
ti = backfill_run.get_task_instances()[0]
ti.set_state(TaskInstanceState.SUCCESS)
dm = DagModel.get_dagmodel(dag.dag_id)
dm.is_paused = True
session.merge(dm)
session.merge(ti)
session.flush()
assert backfill_run.state == State.RUNNING
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
scheduler_job.executor = MockExecutor()
self.job_runner._update_dag_run_state_for_paused_dags()
session.flush()
(backfill_run,) = DagRun.find(dag_id=dag.dag_id, run_type=DagRunType.BACKFILL_JOB, session=session)
assert backfill_run.state == State.RUNNING
def test_dataset_orphaning(self, dag_maker, session):
dataset1 = Dataset(uri="ds1")
dataset2 = Dataset(uri="ds2")
dataset3 = Dataset(uri="ds3")
dataset4 = Dataset(uri="ds4")
with dag_maker(dag_id="datasets-1", schedule=[dataset1, dataset2], session=session):
BashOperator(task_id="task", bash_command="echo 1", outlets=[dataset3, dataset4])
non_orphaned_dataset_count = session.query(DatasetModel).filter(~DatasetModel.is_orphaned).count()
assert non_orphaned_dataset_count == 4
orphaned_dataset_count = session.query(DatasetModel).filter(DatasetModel.is_orphaned).count()
assert orphaned_dataset_count == 0
# now remove 2 dataset references
with dag_maker(dag_id="datasets-1", schedule=[dataset1], session=session):
BashOperator(task_id="task", bash_command="echo 1", outlets=[dataset3])
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
self.job_runner._orphan_unreferenced_datasets(session=session)
session.flush()
# and find the orphans
non_orphaned_datasets = [
dataset.uri
for dataset in session.query(DatasetModel.uri)
.filter(~DatasetModel.is_orphaned)
.order_by(DatasetModel.uri)
]
assert non_orphaned_datasets == ["ds1", "ds3"]
orphaned_datasets = [
dataset.uri
for dataset in session.query(DatasetModel.uri)
.filter(DatasetModel.is_orphaned)
.order_by(DatasetModel.uri)
]
assert orphaned_datasets == ["ds2", "ds4"]
def test_misconfigured_dags_doesnt_crash_scheduler(self, session, dag_maker, caplog):
"""Test that if dagrun creation throws an exception, the scheduler doesn't crash"""
with dag_maker("testdag1", serialized=True):
BashOperator(task_id="task", bash_command="echo 1")
dm1 = dag_maker.dag_model
# Here, the next_dagrun is set to None, which will cause an exception
dm1.next_dagrun = None
session.add(dm1)
session.flush()
with dag_maker("testdag2", serialized=True):
BashOperator(task_id="task", bash_command="echo 1")
dm2 = dag_maker.dag_model
scheduler_job = Job()
job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
# In the dagmodel list, the first dag should fail, but the second one should succeed
job_runner._create_dag_runs([dm1, dm2], session)
assert "Failed creating DagRun for testdag1" in caplog.text
assert not DagRun.find(dag_id="testdag1", session=session)
# Check if the second dagrun was created
assert DagRun.find(dag_id="testdag2", session=session)
@pytest.mark.need_serialized_dag
def test_schedule_dag_run_with_upstream_skip(dag_maker, session):
"""
Test if _schedule_dag_run puts a task instance into SKIPPED state if any of its
upstream tasks are skipped according to TriggerRuleDep.
"""
with dag_maker(
dag_id="test_task_with_upstream_skip_process_task_instances",
start_date=DEFAULT_DATE,
session=session,
):
dummy1 = EmptyOperator(task_id="dummy1")
dummy2 = EmptyOperator(task_id="dummy2")
dummy3 = EmptyOperator(task_id="dummy3")
[dummy1, dummy2] >> dummy3
# dag_file_processor = DagFileProcessor(dag_ids=[], log=mock.MagicMock())
dr = dag_maker.create_dagrun(state=State.RUNNING)
assert dr is not None
tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
# Set dummy1 to skipped and dummy2 to success. dummy3 remains as none.
tis[dummy1.task_id].state = State.SKIPPED
tis[dummy2.task_id].state = State.SUCCESS
assert tis[dummy3.task_id].state == State.NONE
session.flush()
# dag_runs = DagRun.find(dag_id='test_task_with_upstream_skip_dag')
# dag_file_processor._process_task_instances(dag, dag_runs=dag_runs)
scheduler_job = Job()
job_runner = SchedulerJobRunner(job=scheduler_job, subdir=os.devnull)
job_runner._schedule_dag_run(dr, session)
session.flush()
tis = {ti.task_id: ti for ti in dr.get_task_instances(session=session)}
assert tis[dummy1.task_id].state == State.SKIPPED
assert tis[dummy2.task_id].state == State.SUCCESS
# dummy3 should be skipped because dummy1 is skipped.
assert tis[dummy3.task_id].state == State.SKIPPED
class TestSchedulerJobQueriesCount:
"""
These tests are designed to detect changes in the number of queries for
different DAG files. These tests allow easy detection when a change is
made that affects the performance of the SchedulerJob.
"""
scheduler_job: Job | None
@staticmethod
def clean_db():
clear_db_runs()
clear_db_pools()
clear_db_dags()
clear_db_sla_miss()
clear_db_import_errors()
clear_db_jobs()
clear_db_serialized_dags()
@pytest.fixture(autouse=True)
def per_test(self) -> Generator:
self.clean_db()
yield
if self.job_runner.processor_agent: # type: ignore[attr-defined]
self.job_runner.processor_agent.end() # type: ignore[attr-defined]
self.clean_db()
@pytest.mark.parametrize(
"expected_query_count, dag_count, task_count",
[
(21, 1, 1), # One DAG with one task per DAG file.
(21, 1, 5), # One DAG with five tasks per DAG file.
(93, 10, 10), # 10 DAGs with 10 tasks per DAG file.
],
)
def test_execute_queries_count_with_harvested_dags(self, expected_query_count, dag_count, task_count):
with mock.patch.dict(
"os.environ",
{
"PERF_DAGS_COUNT": str(dag_count),
"PERF_TASKS_COUNT": str(task_count),
"PERF_START_AGO": "1d",
"PERF_SCHEDULE_INTERVAL": "30m",
"PERF_SHAPE": "no_structure",
},
), conf_vars(
{
("scheduler", "use_job_schedule"): "True",
("core", "load_examples"): "False",
# For longer running tests under heavy load, the min_serialized_dag_fetch_interval
# and min_serialized_dag_update_interval might kick-in and re-retrieve the record.
# This will increase the count of serliazied_dag.py.get() count.
# That's why we keep the values high
("core", "min_serialized_dag_update_interval"): "100",
("core", "min_serialized_dag_fetch_interval"): "100",
}
):
dagruns = []
dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False, read_dags_from_db=False)
dagbag.sync_to_db()
dag_ids = dagbag.dag_ids
dagbag = DagBag(read_dags_from_db=True)
for i, dag_id in enumerate(dag_ids):
dag = dagbag.get_dag(dag_id)
dr = dag.create_dagrun(
state=State.RUNNING,
run_id=f"{DagRunType.MANUAL.value}__{i}",
dag_hash=dagbag.dags_hash[dag.dag_id],
)
dagruns.append(dr)
for ti in dr.get_task_instances():
ti.set_state(state=State.SCHEDULED)
mock_agent = mock.MagicMock()
scheduler_job = Job(
executor=MockExecutor(do_update=False),
)
scheduler_job.heartbeat = mock.MagicMock()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=PERF_DAGS_FOLDER, num_runs=1)
self.job_runner.processor_agent = mock_agent
with assert_queries_count(expected_query_count, margin=15):
with mock.patch.object(DagRun, "next_dagruns_to_examine") as mock_dagruns:
query = MagicMock()
query.all.return_value = dagruns
mock_dagruns.return_value = query
self.job_runner._run_scheduler_loop()
@pytest.mark.parametrize(
"expected_query_counts, dag_count, task_count, start_ago, schedule_interval, shape",
[
# One DAG with one task per DAG file.
([10, 10, 10, 10], 1, 1, "1d", "None", "no_structure"),
([10, 10, 10, 10], 1, 1, "1d", "None", "linear"),
([24, 14, 14, 14], 1, 1, "1d", "@once", "no_structure"),
([24, 14, 14, 14], 1, 1, "1d", "@once", "linear"),
([24, 26, 29, 32], 1, 1, "1d", "30m", "no_structure"),
([24, 26, 29, 32], 1, 1, "1d", "30m", "linear"),
([24, 26, 29, 32], 1, 1, "1d", "30m", "binary_tree"),
([24, 26, 29, 32], 1, 1, "1d", "30m", "star"),
([24, 26, 29, 32], 1, 1, "1d", "30m", "grid"),
# One DAG with five tasks per DAG file.
([10, 10, 10, 10], 1, 5, "1d", "None", "no_structure"),
([10, 10, 10, 10], 1, 5, "1d", "None", "linear"),
([24, 14, 14, 14], 1, 5, "1d", "@once", "no_structure"),
([25, 15, 15, 15], 1, 5, "1d", "@once", "linear"),
([24, 26, 29, 32], 1, 5, "1d", "30m", "no_structure"),
([25, 28, 32, 36], 1, 5, "1d", "30m", "linear"),
([25, 28, 32, 36], 1, 5, "1d", "30m", "binary_tree"),
([25, 28, 32, 36], 1, 5, "1d", "30m", "star"),
([25, 28, 32, 36], 1, 5, "1d", "30m", "grid"),
# 10 DAGs with 10 tasks per DAG file.
([10, 10, 10, 10], 10, 10, "1d", "None", "no_structure"),
([10, 10, 10, 10], 10, 10, "1d", "None", "linear"),
([105, 38, 38, 38], 10, 10, "1d", "@once", "no_structure"),
([115, 51, 51, 51], 10, 10, "1d", "@once", "linear"),
([105, 119, 119, 119], 10, 10, "1d", "30m", "no_structure"),
([115, 145, 145, 145], 10, 10, "1d", "30m", "linear"),
([115, 139, 139, 139], 10, 10, "1d", "30m", "binary_tree"),
([115, 139, 139, 139], 10, 10, "1d", "30m", "star"),
([115, 139, 139, 139], 10, 10, "1d", "30m", "grid"),
],
)
def test_process_dags_queries_count(
self, expected_query_counts, dag_count, task_count, start_ago, schedule_interval, shape
):
with mock.patch.dict(
"os.environ",
{
"PERF_DAGS_COUNT": str(dag_count),
"PERF_TASKS_COUNT": str(task_count),
"PERF_START_AGO": start_ago,
"PERF_SCHEDULE_INTERVAL": schedule_interval,
"PERF_SHAPE": shape,
},
), conf_vars(
{
("scheduler", "use_job_schedule"): "True",
# For longer running tests under heavy load, the min_serialized_dag_fetch_interval
# and min_serialized_dag_update_interval might kick-in and re-retrieve the record.
# This will increase the count of serliazied_dag.py.get() count.
# That's why we keep the values high
("core", "min_serialized_dag_update_interval"): "100",
("core", "min_serialized_dag_fetch_interval"): "100",
}
):
dagbag = DagBag(dag_folder=ELASTIC_DAG_FILE, include_examples=False)
dagbag.sync_to_db()
mock_agent = mock.MagicMock()
scheduler_job = Job(job_type=SchedulerJobRunner.job_type, executor=MockExecutor(do_update=False))
scheduler_job.heartbeat = mock.MagicMock()
self.job_runner = SchedulerJobRunner(job=scheduler_job, subdir=PERF_DAGS_FOLDER, num_runs=1)
self.job_runner.processor_agent = mock_agent
failures = [] # Collects assertion errors and report all of them at the end.
message = "Expected {expected_count} query, but got {current_count} located at:"
for expected_query_count in expected_query_counts:
with create_session() as session:
try:
with assert_queries_count(expected_query_count, message_fmt=message, margin=15):
self.job_runner._do_scheduling(session)
except AssertionError as e:
failures.append(str(e))
if failures:
prefix = "Collected database query count mismatches:"
joined = "\n\n".join(failures)
raise AssertionError(f"{prefix}\n\n{joined}")