| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import annotations |
| |
| import json |
| import logging |
| import os |
| import random |
| import re |
| import shutil |
| import signal |
| import textwrap |
| import time |
| from collections import deque |
| from datetime import datetime, timedelta |
| from pathlib import Path |
| from socket import socket, socketpair |
| from unittest import mock |
| from unittest.mock import MagicMock |
| |
| import msgspec |
| import pytest |
| import time_machine |
| from sqlalchemy import func, select |
| from uuid6 import uuid7 |
| |
| from airflow._shared.timezones import timezone |
| from airflow.callbacks.callback_requests import DagCallbackRequest |
| from airflow.dag_processing.bundles.manager import DagBundlesManager |
| from airflow.dag_processing.dagbag import DagBag |
| from airflow.dag_processing.manager import ( |
| DagFileInfo, |
| DagFileProcessorManager, |
| DagFileStat, |
| ) |
| from airflow.dag_processing.processor import DagFileProcessorProcess |
| from airflow.models import DagModel, DbCallbackRequest |
| from airflow.models.asset import TaskOutletAssetReference |
| from airflow.models.dag_version import DagVersion |
| from airflow.models.dagbundle import DagBundleModel |
| from airflow.models.dagcode import DagCode |
| from airflow.models.serialized_dag import SerializedDagModel |
| from airflow.models.team import Team |
| from airflow.utils.net import get_hostname |
| from airflow.utils.session import create_session |
| |
| from tests_common.test_utils.compat import ParseImportError |
| from tests_common.test_utils.config import conf_vars |
| from tests_common.test_utils.dag import sync_dag_to_db |
| from tests_common.test_utils.db import ( |
| clear_db_assets, |
| clear_db_callbacks, |
| clear_db_dag_bundles, |
| clear_db_dags, |
| clear_db_import_errors, |
| clear_db_runs, |
| clear_db_serialized_dags, |
| clear_db_teams, |
| ) |
| from unit.models import TEST_DAGS_FOLDER |
| |
| pytestmark = pytest.mark.db_test |
| |
| logger = logging.getLogger(__name__) |
| TEST_DAG_FOLDER = Path(__file__).parents[1].resolve() / "dags" |
| DEFAULT_DATE = timezone.datetime(2016, 1, 1) |
| |
| |
| def _get_file_infos(files: list[str | Path]) -> list[DagFileInfo]: |
| return [DagFileInfo(bundle_name="testing", bundle_path=TEST_DAGS_FOLDER, rel_path=Path(f)) for f in files] |
| |
| |
| def mock_get_mtime(file: Path): |
| f = str(file) |
| m = re.match(pattern=r".*ss=(.+?)\.\w+", string=f) |
| if not m: |
| raise ValueError(f"unexpected: {file}") |
| match = m.group(1) |
| if match == "<class 'FileNotFoundError'>": |
| raise FileNotFoundError() |
| try: |
| return int(match) |
| except Exception: |
| raise ValueError(f"could not convert value {match} to int") |
| |
| |
| def encode_mtime_in_filename(val): |
| from pathlib import PurePath |
| |
| out = [] |
| for fname, mtime in val: |
| f = PurePath(PurePath(fname).name) |
| addition = f"ss={str(mtime)}" |
| out.append(f"{f.stem}-{addition}{f.suffix}") |
| return out |
| |
| |
| class TestDagFileProcessorManager: |
| @pytest.fixture(autouse=True) |
| def _disable_examples(self): |
| with conf_vars({("core", "load_examples"): "False"}): |
| yield |
| |
| def setup_method(self): |
| clear_db_teams() |
| clear_db_assets() |
| clear_db_runs() |
| clear_db_serialized_dags() |
| clear_db_dags() |
| clear_db_callbacks() |
| clear_db_import_errors() |
| clear_db_dag_bundles() |
| |
| def teardown_class(self): |
| clear_db_teams() |
| clear_db_assets() |
| clear_db_runs() |
| clear_db_serialized_dags() |
| clear_db_dags() |
| clear_db_callbacks() |
| clear_db_import_errors() |
| clear_db_dag_bundles() |
| |
| def mock_processor(self, start_time: float | None = None) -> tuple[DagFileProcessorProcess, socket]: |
| proc = MagicMock() |
| logger_filehandle = MagicMock() |
| proc.create_time.return_value = time.time() |
| proc.wait.return_value = 0 |
| read_end, write_end = socketpair() |
| ret = DagFileProcessorProcess( |
| process_log=MagicMock(), |
| id=uuid7(), |
| pid=1234, |
| process=proc, |
| stdin=write_end, |
| logger_filehandle=logger_filehandle, |
| client=MagicMock(), |
| ) |
| if start_time: |
| ret.start_time = start_time |
| ret._open_sockets.clear() |
| return ret, read_end |
| |
| @pytest.fixture |
| def clear_parse_import_errors(self): |
| clear_db_import_errors() |
| |
| @pytest.mark.usefixtures("clear_parse_import_errors") |
| @conf_vars({("core", "load_examples"): "False"}) |
| def test_remove_file_clears_import_error(self, tmp_path, configure_testing_dag_bundle): |
| path_to_parse = tmp_path / "temp_dag.py" |
| |
| # Generate original import error |
| path_to_parse.write_text("an invalid airflow DAG") |
| |
| with configure_testing_dag_bundle(path_to_parse): |
| manager = DagFileProcessorManager( |
| max_runs=1, |
| processor_timeout=365 * 86_400, |
| ) |
| |
| manager.run() |
| |
| with create_session() as session: |
| import_errors = session.query(ParseImportError).all() |
| assert len(import_errors) == 1 |
| |
| path_to_parse.unlink() |
| |
| # Rerun the parser once the dag file has been removed |
| manager.run() |
| |
| with create_session() as session: |
| import_errors = session.query(ParseImportError).all() |
| |
| assert len(import_errors) == 0 |
| session.rollback() |
| |
| @conf_vars({("core", "load_examples"): "False"}) |
| def test_max_runs_when_no_files(self, tmp_path): |
| with conf_vars({("core", "dags_folder"): str(tmp_path)}): |
| manager = DagFileProcessorManager(max_runs=1) |
| manager.run() |
| |
| # TODO: AIP-66 no asserts? |
| |
| def test_start_new_processes_with_same_filepath(self, configure_testing_dag_bundle): |
| """ |
| Test that when a processor already exist with a filepath, a new processor won't be created |
| with that filepath. The filepath will just be removed from the list. |
| """ |
| with configure_testing_dag_bundle("/tmp"): |
| manager = DagFileProcessorManager(max_runs=1) |
| manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles()) |
| |
| file_1 = DagFileInfo(bundle_name="testing", rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER) |
| file_2 = DagFileInfo(bundle_name="testing", rel_path=Path("file_2.py"), bundle_path=TEST_DAGS_FOLDER) |
| file_3 = DagFileInfo(bundle_name="testing", rel_path=Path("file_3.py"), bundle_path=TEST_DAGS_FOLDER) |
| manager._file_queue = deque([file_1, file_2, file_3]) |
| |
| # Mock that only one processor exists. This processor runs with 'file_1' |
| manager._processors[file_1] = MagicMock() |
| # Start New Processes |
| with mock.patch.object(DagFileProcessorManager, "_create_process"): |
| manager._start_new_processes() |
| |
| # Because of the config: '[dag_processor] parsing_processes = 2' |
| # verify that only one extra process is created |
| # and since a processor with 'file_1' already exists, |
| # even though it is first in '_file_path_queue' |
| # a new processor is created with 'file_2' and not 'file_1'. |
| |
| assert file_1 in manager._processors.keys() |
| assert file_2 in manager._processors.keys() |
| assert deque([file_3]) == manager._file_queue |
| |
| def test_handle_removed_files_when_processor_file_path_not_in_new_file_paths(self): |
| """Ensure processors and file stats are removed when the file path is not in the new file paths""" |
| manager = DagFileProcessorManager(max_runs=1) |
| bundle_name = "testing" |
| file = DagFileInfo( |
| bundle_name=bundle_name, rel_path=Path("missing_file.txt"), bundle_path=TEST_DAGS_FOLDER |
| ) |
| |
| manager._processors[file] = MagicMock() |
| manager._file_stats[file] = DagFileStat() |
| |
| manager.handle_removed_files({bundle_name: set()}) |
| assert manager._processors == {} |
| assert file not in manager._file_stats |
| |
| def test_handle_removed_files_when_processor_file_path_is_present(self): |
| """handle_removed_files should not purge files that are still present.""" |
| manager = DagFileProcessorManager(max_runs=1) |
| bundle_name = "testing" |
| file = DagFileInfo(bundle_name=bundle_name, rel_path=Path("abc.txt"), bundle_path=TEST_DAGS_FOLDER) |
| mock_processor = MagicMock() |
| |
| manager._processors[file] = mock_processor |
| |
| manager.handle_removed_files(known_files={bundle_name: {file}}) |
| assert manager._processors == {file: mock_processor} |
| |
| @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"}) |
| def test_files_in_queue_sorted_alphabetically(self): |
| """Test dag files are sorted alphabetically""" |
| file_names = ["file_3.py", "file_2.py", "file_4.py", "file_1.py"] |
| dag_files = _get_file_infos(file_names) |
| ordered_dag_files = _get_file_infos(sorted(file_names)) |
| |
| manager = DagFileProcessorManager(max_runs=1) |
| known_files = {"some-bundle": set(dag_files)} |
| assert manager._file_queue == deque() |
| manager.prepare_file_queue(known_files=known_files) |
| assert manager._file_queue == deque(ordered_dag_files) |
| |
| @conf_vars({("dag_processor", "file_parsing_sort_mode"): "random_seeded_by_host"}) |
| def test_files_sorted_random_seeded_by_host(self): |
| """Test files are randomly sorted and seeded by host name""" |
| f_infos = _get_file_infos(["file_3.py", "file_2.py", "file_4.py", "file_1.py"]) |
| known_files = {"anything": f_infos} |
| manager = DagFileProcessorManager(max_runs=1) |
| |
| assert manager._file_queue == deque() |
| manager.prepare_file_queue(known_files=known_files) # using list over test for reproducibility |
| random.Random(get_hostname()).shuffle(f_infos) |
| expected = deque(f_infos) |
| assert manager._file_queue == expected |
| |
| # Verify running it again produces same order |
| manager._files = [] |
| manager.prepare_file_queue(known_files=known_files) |
| assert manager._file_queue == expected |
| |
| @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"}) |
| @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime) |
| def test_files_sorted_by_modified_time(self): |
| """Test files are sorted by modified time""" |
| paths_with_mtime = [ |
| ("file_3.py", 3.0), |
| ("file_2.py", 2.0), |
| ("file_4.py", 5.0), |
| ("file_1.py", 4.0), |
| ] |
| filenames = encode_mtime_in_filename(paths_with_mtime) |
| dag_files = _get_file_infos(filenames) |
| |
| manager = DagFileProcessorManager(max_runs=1) |
| |
| assert manager._file_queue == deque() |
| manager.prepare_file_queue(known_files={"any": set(dag_files)}) |
| ordered_files = _get_file_infos( |
| [ |
| "file_4-ss=5.0.py", |
| "file_1-ss=4.0.py", |
| "file_3-ss=3.0.py", |
| "file_2-ss=2.0.py", |
| ] |
| ) |
| assert manager._file_queue == deque(ordered_files) |
| |
| @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"}) |
| @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime) |
| def test_queued_files_exclude_missing_file(self): |
| """Check that a file is not enqueued for processing if it has been deleted""" |
| file_and_mtime = [("file_3.py", 2.0), ("file_2.py", 3.0), ("file_4.py", FileNotFoundError)] |
| filenames = encode_mtime_in_filename(file_and_mtime) |
| file_infos = _get_file_infos(filenames) |
| manager = DagFileProcessorManager(max_runs=1) |
| manager.prepare_file_queue(known_files={"any": set(file_infos)}) |
| ordered_files = _get_file_infos(["file_2-ss=3.0.py", "file_3-ss=2.0.py"]) |
| assert manager._file_queue == deque(ordered_files) |
| |
| @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"}) |
| @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime) |
| def test_add_new_file_to_parsing_queue(self): |
| """Check that new file is added to parsing queue""" |
| dag_files = _get_file_infos(["file_1-ss=2.0.py", "file_2-ss=3.0.py", "file_3-ss=4.0.py"]) |
| from random import Random |
| |
| Random("file_2.py").random() |
| manager = DagFileProcessorManager(max_runs=1) |
| |
| manager.prepare_file_queue(known_files={"any": set(dag_files)}) |
| assert set(manager._file_queue) == set(dag_files) |
| |
| manager.prepare_file_queue( |
| known_files={"any": set((*dag_files, *_get_file_infos(["file_4-ss=1.0.py"])))} |
| ) |
| # manager.add_files_to_queue() |
| ordered_files = _get_file_infos( |
| [ |
| "file_3-ss=4.0.py", |
| "file_2-ss=3.0.py", |
| "file_1-ss=2.0.py", |
| "file_4-ss=1.0.py", |
| ] |
| ) |
| assert manager._file_queue == deque(ordered_files) |
| |
| @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"}) |
| @mock.patch("airflow.utils.file.os.path.getmtime") |
| def test_recently_modified_file_is_parsed_with_mtime_mode(self, mock_getmtime): |
| """ |
| Test recently updated files are processed even if min_file_process_interval is not reached |
| """ |
| freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0) |
| initial_file_1_mtime = (freezed_base_time - timedelta(minutes=5)).timestamp() |
| dag_file = DagFileInfo( |
| bundle_name="testing", rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER |
| ) |
| known_files = {"does-not-matter": {dag_file}} |
| mock_getmtime.side_effect = [initial_file_1_mtime] |
| |
| manager = DagFileProcessorManager(max_runs=3) |
| |
| # let's say the DAG was just parsed 10 seconds before the Freezed time |
| last_finish_time = freezed_base_time - timedelta(seconds=10) |
| manager._file_stats = { |
| dag_file: DagFileStat(1, 0, last_finish_time, 1.0, 1, 1), |
| } |
| with time_machine.travel(freezed_base_time): |
| assert manager._file_queue == deque() |
| # File Path Queue will be empty as the "modified time" < "last finish time" |
| manager.prepare_file_queue(known_files=known_files) |
| assert manager._file_queue == deque() |
| |
| # Simulate the DAG modification by using modified_time which is greater |
| # than the last_parse_time but still less than now - min_file_process_interval |
| file_1_new_mtime = freezed_base_time - timedelta(seconds=5) |
| file_1_new_mtime_ts = file_1_new_mtime.timestamp() |
| with time_machine.travel(freezed_base_time): |
| assert manager._file_queue == deque() |
| # File Path Queue will be empty as the "modified time" < "last finish time" |
| mock_getmtime.side_effect = [file_1_new_mtime_ts] |
| manager.prepare_file_queue(known_files=known_files) |
| # Check that file is added to the queue even though file was just recently passed |
| assert manager._file_queue == deque([dag_file]) |
| assert last_finish_time < file_1_new_mtime |
| assert ( |
| manager._file_process_interval |
| > (freezed_base_time - manager._file_stats[dag_file].last_finish_time).total_seconds() |
| ) |
| |
| def test_file_paths_in_queue_sorted_by_priority(self): |
| from airflow.models.dagbag import DagPriorityParsingRequest |
| |
| parsing_request = DagPriorityParsingRequest(relative_fileloc="file_1.py", bundle_name="dags-folder") |
| with create_session() as session: |
| session.add(parsing_request) |
| session.commit() |
| |
| file1 = DagFileInfo( |
| bundle_name="dags-folder", rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER |
| ) |
| file2 = DagFileInfo( |
| bundle_name="dags-folder", rel_path=Path("file_2.py"), bundle_path=TEST_DAGS_FOLDER |
| ) |
| |
| manager = DagFileProcessorManager(max_runs=1) |
| manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles()) |
| manager._file_queue = deque([file2, file1]) |
| manager._queue_requested_files_for_parsing() |
| assert manager._file_queue == deque([file1, file2]) |
| assert manager._force_refresh_bundles == {"dags-folder"} |
| with create_session() as session2: |
| parsing_request_after = session2.get(DagPriorityParsingRequest, parsing_request.id) |
| assert parsing_request_after is None |
| |
| def test_parsing_requests_only_bundles_being_parsed(self, testing_dag_bundle): |
| """Ensure the manager only handles parsing requests for bundles being parsed in this manager""" |
| from airflow.models.dagbag import DagPriorityParsingRequest |
| |
| with create_session() as session: |
| session.add(DagPriorityParsingRequest(relative_fileloc="file_1.py", bundle_name="dags-folder")) |
| session.add(DagPriorityParsingRequest(relative_fileloc="file_x.py", bundle_name="testing")) |
| session.commit() |
| |
| file1 = DagFileInfo( |
| bundle_name="dags-folder", rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER |
| ) |
| |
| manager = DagFileProcessorManager(max_runs=1) |
| manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles()) |
| manager._queue_requested_files_for_parsing() |
| assert manager._file_queue == deque([file1]) |
| with create_session() as session2: |
| parsing_request_after = session2.query(DagPriorityParsingRequest).all() |
| assert len(parsing_request_after) == 1 |
| assert parsing_request_after[0].relative_fileloc == "file_x.py" |
| |
| @pytest.mark.usefixtures("testing_dag_bundle") |
| def test_scan_stale_dags(self, session): |
| """ |
| Ensure that DAGs are marked inactive when the file is parsed but the |
| DagModel.last_parsed_time is not updated. |
| """ |
| manager = DagFileProcessorManager( |
| max_runs=1, |
| processor_timeout=10 * 60, |
| ) |
| bundle = MagicMock() |
| bundle.name = "testing" |
| manager._dag_bundles = [bundle] |
| |
| test_dag_path = DagFileInfo( |
| bundle_name="testing", |
| rel_path=Path("test_example_bash_operator.py"), |
| bundle_path=TEST_DAGS_FOLDER, |
| ) |
| dagbag = DagBag( |
| test_dag_path.absolute_path, |
| include_examples=False, |
| bundle_path=test_dag_path.bundle_path, |
| ) |
| |
| # Add stale DAG to the DB |
| dag = dagbag.get_dag("test_example_bash_operator") |
| sync_dag_to_db(dag, session=session) |
| |
| # Add DAG to the file_parsing_stats |
| stat = DagFileStat( |
| num_dags=1, |
| import_errors=0, |
| last_finish_time=timezone.utcnow() + timedelta(hours=1), |
| last_duration=1, |
| run_count=1, |
| last_num_of_db_queries=1, |
| ) |
| manager._files = [test_dag_path] |
| manager._file_stats[test_dag_path] = stat |
| |
| active_dag_count = ( |
| session.query(func.count(DagModel.dag_id)) |
| .filter( |
| ~DagModel.is_stale, |
| DagModel.relative_fileloc == str(test_dag_path.rel_path), |
| DagModel.bundle_name == test_dag_path.bundle_name, |
| ) |
| .scalar() |
| ) |
| assert active_dag_count == 1 |
| |
| manager._scan_stale_dags() |
| |
| active_dag_count = ( |
| session.query(func.count(DagModel.dag_id)) |
| .filter( |
| ~DagModel.is_stale, |
| DagModel.relative_fileloc == str(test_dag_path.rel_path), |
| DagModel.bundle_name == test_dag_path.bundle_name, |
| ) |
| .scalar() |
| ) |
| assert active_dag_count == 0 |
| |
| serialized_dag_count = ( |
| session.query(func.count(SerializedDagModel.dag_id)) |
| .filter(SerializedDagModel.dag_id == dag.dag_id) |
| .scalar() |
| ) |
| # Deactivating the DagModel should not delete the SerializedDagModel |
| # SerializedDagModel gives history about Dags |
| assert serialized_dag_count == 1 |
| |
| def test_kill_timed_out_processors_kill(self): |
| manager = DagFileProcessorManager(max_runs=1, processor_timeout=5) |
| # Set start_time to ensure timeout occurs: start_time = current_time - (timeout + 1) = always (timeout + 1) seconds |
| start_time = time.monotonic() - manager.processor_timeout - 1 |
| processor, _ = self.mock_processor(start_time=start_time) |
| manager._processors = { |
| DagFileInfo( |
| bundle_name="testing", rel_path=Path("abc.txt"), bundle_path=TEST_DAGS_FOLDER |
| ): processor |
| } |
| with mock.patch.object(type(processor), "kill") as mock_kill: |
| manager._kill_timed_out_processors() |
| mock_kill.assert_called_once_with(signal.SIGKILL) |
| assert len(manager._processors) == 0 |
| processor.logger_filehandle.close.assert_called() |
| |
| def test_kill_timed_out_processors_no_kill(self): |
| manager = DagFileProcessorManager( |
| max_runs=1, |
| processor_timeout=5, |
| ) |
| |
| processor, _ = self.mock_processor() |
| processor._process.create_time.return_value = timezone.make_aware(datetime.max).timestamp() |
| manager._processors = { |
| DagFileInfo( |
| bundle_name="testing", rel_path=Path("abc.txt"), bundle_path=TEST_DAGS_FOLDER |
| ): processor |
| } |
| with mock.patch.object(type(processor), "kill") as mock_kill: |
| manager._kill_timed_out_processors() |
| mock_kill.assert_not_called() |
| |
| @pytest.mark.usefixtures("testing_dag_bundle") |
| @pytest.mark.parametrize( |
| ["callbacks", "path", "expected_body"], |
| [ |
| pytest.param( |
| [], |
| "/opt/airflow/dags/test_dag.py", |
| { |
| "file": "/opt/airflow/dags/test_dag.py", |
| "bundle_path": "/opt/airflow/dags", |
| "callback_requests": [], |
| "type": "DagFileParseRequest", |
| }, |
| ), |
| pytest.param( |
| [ |
| DagCallbackRequest( |
| filepath="dag_callback_dag.py", |
| dag_id="dag_id", |
| run_id="run_id", |
| bundle_name="testing", |
| bundle_version=None, |
| context_from_server=None, |
| is_failure_callback=False, |
| ) |
| ], |
| "/opt/airflow/dags/dag_callback_dag.py", |
| { |
| "file": "/opt/airflow/dags/dag_callback_dag.py", |
| "bundle_path": "/opt/airflow/dags", |
| "callback_requests": [ |
| { |
| "filepath": "dag_callback_dag.py", |
| "bundle_name": "testing", |
| "bundle_version": None, |
| "msg": None, |
| "dag_id": "dag_id", |
| "run_id": "run_id", |
| "context_from_server": None, |
| "is_failure_callback": False, |
| "type": "DagCallbackRequest", |
| } |
| ], |
| "type": "DagFileParseRequest", |
| }, |
| ), |
| ], |
| ) |
| def test_serialize_callback_requests(self, callbacks, path, expected_body): |
| from airflow.sdk.execution_time.comms import _ResponseFrame |
| |
| processor, read_socket = self.mock_processor() |
| processor._on_child_started(callbacks, path, bundle_path=Path("/opt/airflow/dags")) |
| |
| read_socket.settimeout(0.1) |
| # Read response from the read end of the socket |
| frame_len = int.from_bytes(read_socket.recv(4), "big") |
| bytes = read_socket.recv(frame_len) |
| frame = msgspec.msgpack.Decoder(_ResponseFrame).decode(bytes) |
| |
| assert frame.body == expected_body |
| |
| @conf_vars({("core", "load_examples"): "False"}) |
| @pytest.mark.execution_timeout(10) |
| def test_dag_with_system_exit(self, configure_testing_dag_bundle): |
| """ |
| Test to check that a DAG with a system.exit() doesn't break the scheduler. |
| """ |
| dag_id = "exit_test_dag" |
| dag_directory = TEST_DAG_FOLDER.parent / "dags_with_system_exit" |
| |
| # Delete the one valid DAG/SerializedDAG, and check that it gets re-created |
| clear_db_dags() |
| clear_db_serialized_dags() |
| |
| with configure_testing_dag_bundle(dag_directory): |
| manager = DagFileProcessorManager(max_runs=1) |
| manager.run() |
| |
| # Three files in folder should be processed |
| assert sum(stat.run_count for stat in manager._file_stats.values()) == 3 |
| |
| with create_session() as session: |
| assert session.get(DagModel, dag_id) is not None |
| |
| @conf_vars({("core", "load_examples"): "False"}) |
| @mock.patch("airflow.dag_processing.manager.Stats.timing") |
| @pytest.mark.skip("AIP-66: stats are not implemented yet") |
| def test_send_file_processing_statsd_timing( |
| self, statsd_timing_mock, tmp_path, configure_testing_dag_bundle |
| ): |
| path_to_parse = tmp_path / "temp_dag.py" |
| dag_code = textwrap.dedent( |
| """ |
| from airflow import DAG |
| dag = DAG(dag_id='temp_dag') |
| """ |
| ) |
| path_to_parse.write_text(dag_code) |
| |
| with configure_testing_dag_bundle(tmp_path): |
| manager = DagFileProcessorManager(max_runs=1) |
| manager.run() |
| |
| last_runtime = manager._file_stats[os.fspath(path_to_parse)].last_duration |
| statsd_timing_mock.assert_has_calls( |
| [ |
| mock.call("dag_processing.last_duration.temp_dag", last_runtime), |
| mock.call("dag_processing.last_duration", last_runtime, tags={"file_name": "temp_dag"}), |
| ], |
| any_order=True, |
| ) |
| |
| @pytest.mark.usefixtures("testing_dag_bundle") |
| def test_refresh_dags_dir_doesnt_delete_zipped_dags( |
| self, tmp_path, session, configure_testing_dag_bundle, test_zip_path |
| ): |
| """Test DagFileProcessorManager._refresh_dag_dir method""" |
| dagbag = DagBag(dag_folder=tmp_path, include_examples=False) |
| dagbag.process_file(test_zip_path) |
| dag = dagbag.get_dag("test_zip_dag") |
| sync_dag_to_db(dag) |
| |
| with configure_testing_dag_bundle(test_zip_path): |
| manager = DagFileProcessorManager(max_runs=1) |
| manager.run() |
| |
| # Assert dag not deleted in SDM |
| assert SerializedDagModel.has_dag("test_zip_dag") |
| # assert code not deleted |
| assert DagCode.has_dag(dag.dag_id) |
| # assert dag still active |
| assert session.get(DagModel, dag.dag_id).is_stale is False |
| |
| @pytest.mark.usefixtures("testing_dag_bundle") |
| def test_refresh_dags_dir_deactivates_deleted_zipped_dags( |
| self, session, tmp_path, configure_testing_dag_bundle, test_zip_path |
| ): |
| """Test DagFileProcessorManager._refresh_dag_dir method""" |
| dag_id = "test_zip_dag" |
| filename = "test_zip.zip" |
| source_location = test_zip_path |
| bundle_path = Path(tmp_path, "test_refresh_dags_dir_deactivates_deleted_zipped_dags") |
| bundle_path.mkdir(exist_ok=True) |
| zip_dag_path = bundle_path / filename |
| shutil.copy(source_location, zip_dag_path) |
| |
| with configure_testing_dag_bundle(bundle_path): |
| session.commit() |
| manager = DagFileProcessorManager(max_runs=1) |
| manager.run() |
| |
| assert SerializedDagModel.has_dag(dag_id) |
| assert DagCode.has_dag(dag_id) |
| assert DagVersion.get_latest_version(dag_id) |
| dag = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id)) |
| assert dag.is_stale is False |
| |
| os.remove(zip_dag_path) |
| |
| manager.run() |
| |
| assert SerializedDagModel.has_dag(dag_id) |
| assert DagCode.has_dag(dag_id) |
| assert DagVersion.get_latest_version(dag_id) |
| dag = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id)) |
| assert dag.is_stale is True |
| |
| def test_deactivate_deleted_dags(self, dag_maker, session): |
| with dag_maker("test_dag1") as dag1: |
| dag1.relative_fileloc = "test_dag1.py" |
| with dag_maker("test_dag2") as dag2: |
| dag2.relative_fileloc = "test_dag2.py" |
| dag_maker.sync_dagbag_to_db() |
| |
| active_files = [ |
| DagFileInfo( |
| bundle_name="dag_maker", |
| rel_path=Path("test_dag1.py"), |
| bundle_path=TEST_DAGS_FOLDER, |
| ), |
| # Mimic that the test_dag2.py file is deleted |
| ] |
| |
| manager = DagFileProcessorManager(max_runs=1) |
| manager.deactivate_deleted_dags("dag_maker", active_files) |
| |
| # The DAG from test_dag1.py is still active |
| assert session.get(DagModel, "test_dag1").is_stale is False |
| # and the DAG from test_dag2.py is deactivated |
| assert session.get(DagModel, "test_dag2").is_stale is True |
| |
| @pytest.mark.parametrize( |
| "rel_filelocs, expected_return, expected_dag1_stale, expected_dag2_stale", |
| [ |
| pytest.param( |
| ["test_dag1.py"], # Only dag1 present, dag2 deleted |
| True, # Should return True |
| False, # dag1 should not be stale |
| True, # dag2 should be stale |
| id="dags_deactivated", |
| ), |
| pytest.param( |
| ["test_dag1.py", "test_dag2.py"], # Both files present |
| False, # Should return False |
| False, # dag1 should not be stale |
| False, # dag2 should not be stale |
| id="no_dags_deactivated", |
| ), |
| ], |
| ) |
| def test_deactivate_deleted_dags_return_value( |
| self, dag_maker, session, rel_filelocs, expected_return, expected_dag1_stale, expected_dag2_stale |
| ): |
| """Test that DagModel.deactivate_deleted_dags returns correct boolean value.""" |
| with dag_maker("test_dag1") as dag1: |
| dag1.relative_fileloc = "test_dag1.py" |
| with dag_maker("test_dag2") as dag2: |
| dag2.relative_fileloc = "test_dag2.py" |
| dag_maker.sync_dagbag_to_db() |
| |
| any_deactivated = DagModel.deactivate_deleted_dags( |
| bundle_name="dag_maker", |
| rel_filelocs=rel_filelocs, |
| session=session, |
| ) |
| |
| assert any_deactivated is expected_return |
| assert session.get(DagModel, "test_dag1").is_stale is expected_dag1_stale |
| assert session.get(DagModel, "test_dag2").is_stale is expected_dag2_stale |
| |
| @pytest.mark.parametrize( |
| "active_files, should_call_cleanup", |
| [ |
| pytest.param( |
| [ |
| DagFileInfo( |
| bundle_name="dag_maker", |
| rel_path=Path("test_dag1.py"), |
| bundle_path=TEST_DAGS_FOLDER, |
| ), |
| # test_dag2.py is deleted |
| ], |
| True, # Should call cleanup |
| id="dags_deactivated", |
| ), |
| pytest.param( |
| [ |
| DagFileInfo( |
| bundle_name="dag_maker", |
| rel_path=Path("test_dag1.py"), |
| bundle_path=TEST_DAGS_FOLDER, |
| ), |
| DagFileInfo( |
| bundle_name="dag_maker", |
| rel_path=Path("test_dag2.py"), |
| bundle_path=TEST_DAGS_FOLDER, |
| ), |
| ], |
| False, # Should NOT call cleanup |
| id="no_dags_deactivated", |
| ), |
| ], |
| ) |
| @mock.patch("airflow.dag_processing.manager.remove_references_to_deleted_dags") |
| def test_manager_deactivate_deleted_dags_cleanup_behavior( |
| self, mock_remove_references, dag_maker, session, active_files, should_call_cleanup |
| ): |
| """Test that manager conditionally calls remove_references_to_deleted_dags based on whether DAGs were deactivated.""" |
| with dag_maker("test_dag1") as dag1: |
| dag1.relative_fileloc = "test_dag1.py" |
| with dag_maker("test_dag2") as dag2: |
| dag2.relative_fileloc = "test_dag2.py" |
| dag_maker.sync_dagbag_to_db() |
| |
| manager = DagFileProcessorManager(max_runs=1) |
| manager.deactivate_deleted_dags("dag_maker", active_files) |
| |
| if should_call_cleanup: |
| mock_remove_references.assert_called_once() |
| else: |
| mock_remove_references.assert_not_called() |
| |
| @conf_vars({("core", "load_examples"): "False"}) |
| def test_fetch_callbacks_from_database(self, configure_testing_dag_bundle): |
| """Test _fetch_callbacks returns callbacks ordered by priority_weight desc.""" |
| |
| dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py" |
| |
| callback1 = DagCallbackRequest( |
| dag_id="test_start_date_scheduling", |
| bundle_name="testing", |
| bundle_version=None, |
| filepath="test_on_failure_callback_dag.py", |
| is_failure_callback=True, |
| run_id="123", |
| ) |
| callback2 = DagCallbackRequest( |
| dag_id="test_start_date_scheduling", |
| bundle_name="testing", |
| bundle_version=None, |
| filepath="test_on_failure_callback_dag.py", |
| is_failure_callback=True, |
| run_id="456", |
| ) |
| |
| with create_session() as session: |
| session.add(DbCallbackRequest(callback=callback1, priority_weight=11)) |
| session.add(DbCallbackRequest(callback=callback2, priority_weight=10)) |
| |
| with configure_testing_dag_bundle(dag_filepath): |
| manager = DagFileProcessorManager(max_runs=1) |
| manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles()) |
| |
| with create_session() as session: |
| callbacks = manager._fetch_callbacks(session=session) |
| |
| # Should return callbacks ordered by priority_weight desc (highest first) |
| assert callbacks[0].run_id == "123" |
| assert callbacks[1].run_id == "456" |
| |
| assert session.query(DbCallbackRequest).count() == 0 |
| |
| @conf_vars( |
| { |
| ("dag_processor", "max_callbacks_per_loop"): "2", |
| ("core", "load_examples"): "False", |
| } |
| ) |
| def test_fetch_callbacks_from_database_max_per_loop(self, tmp_path, configure_testing_dag_bundle): |
| """Test DagFileProcessorManager._fetch_callbacks method""" |
| dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py" |
| |
| with create_session() as session: |
| for i in range(5): |
| callback = DagCallbackRequest( |
| dag_id="test_start_date_scheduling", |
| bundle_name="testing", |
| bundle_version=None, |
| filepath="test_on_failure_callback_dag.py", |
| is_failure_callback=True, |
| run_id=str(i), |
| ) |
| session.add(DbCallbackRequest(callback=callback, priority_weight=i)) |
| |
| with configure_testing_dag_bundle(dag_filepath): |
| manager = DagFileProcessorManager(max_runs=1) |
| |
| with create_session() as session: |
| manager.run() |
| assert session.query(DbCallbackRequest).count() == 3 |
| |
| with create_session() as session: |
| manager.run() |
| assert session.query(DbCallbackRequest).count() == 1 |
| |
| @conf_vars({("core", "load_examples"): "False"}) |
| def test_fetch_callbacks_ignores_other_bundles(self, configure_testing_dag_bundle): |
| """Ensure callbacks for bundles not owned by current dag processor manager are ignored and not deleted.""" |
| |
| dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py" |
| |
| # Create two callbacks: one for the active 'testing' bundle and one for a different bundle |
| matching = DagCallbackRequest( |
| dag_id="test_start_date_scheduling", |
| bundle_name="testing", |
| bundle_version=None, |
| filepath="test_on_failure_callback_dag.py", |
| is_failure_callback=True, |
| run_id="match", |
| ) |
| non_matching = DagCallbackRequest( |
| dag_id="test_start_date_scheduling", |
| bundle_name="other-bundle", |
| bundle_version=None, |
| filepath="test_on_failure_callback_dag.py", |
| is_failure_callback=True, |
| run_id="no-match", |
| ) |
| |
| with create_session() as session: |
| session.add(DbCallbackRequest(callback=matching, priority_weight=100)) |
| session.add(DbCallbackRequest(callback=non_matching, priority_weight=200)) |
| |
| with configure_testing_dag_bundle(dag_filepath): |
| manager = DagFileProcessorManager(max_runs=1) |
| manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles()) |
| |
| with create_session() as session: |
| callbacks = manager._fetch_callbacks(session=session) |
| |
| # Only the matching callback should be returned |
| assert [c.run_id for c in callbacks] == ["match"] |
| |
| # The non-matching callback should remain in the DB |
| remaining = session.query(DbCallbackRequest).all() |
| assert len(remaining) == 1 |
| # Decode remaining request and verify it's for the other bundle |
| remaining_req = remaining[0].get_callback_request() |
| assert remaining_req.bundle_name == "other-bundle" |
| |
| @mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file") |
| def test_callback_queue(self, mock_get_logger, configure_testing_dag_bundle): |
| mock_logger = MagicMock() |
| mock_filehandle = MagicMock() |
| mock_get_logger.return_value = [mock_logger, mock_filehandle] |
| |
| tmp_path = "/green_eggs/ham" |
| with configure_testing_dag_bundle(tmp_path): |
| # given |
| manager = DagFileProcessorManager( |
| max_runs=1, |
| processor_timeout=365 * 86_400, |
| ) |
| manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles()) |
| |
| dag1_path = DagFileInfo( |
| bundle_name="testing", rel_path=Path("file1.py"), bundle_path=Path(tmp_path) |
| ) |
| dag1_req1 = DagCallbackRequest( |
| filepath="file1.py", |
| dag_id="dag1", |
| run_id="run1", |
| is_failure_callback=False, |
| bundle_name="testing", |
| bundle_version=None, |
| msg=None, |
| ) |
| dag1_req2 = DagCallbackRequest( |
| filepath="file1.py", |
| dag_id="dag1", |
| run_id="run1", |
| is_failure_callback=False, |
| bundle_name="testing", |
| bundle_version=None, |
| msg=None, |
| ) |
| |
| dag2_path = DagFileInfo( |
| bundle_name="testing", rel_path=Path("file2.py"), bundle_path=Path(tmp_path) |
| ) |
| dag2_req1 = DagCallbackRequest( |
| filepath="file2.py", |
| dag_id="dag2", |
| run_id="run1", |
| bundle_name=dag2_path.bundle_name, |
| bundle_version=None, |
| is_failure_callback=False, |
| msg=None, |
| ) |
| |
| # when |
| manager._add_callback_to_queue(dag1_req1) |
| manager._add_callback_to_queue(dag2_req1) |
| |
| # then - requests should be in manager's queue, with dag2 ahead of dag1 (because it was added last) |
| assert manager._file_queue == deque([dag2_path, dag1_path]) |
| assert set(manager._callback_to_execute.keys()) == { |
| dag1_path, |
| dag2_path, |
| } |
| assert manager._callback_to_execute[dag2_path] == [dag2_req1] |
| |
| # update the queue, although the callback is registered |
| assert manager._file_queue == deque([dag2_path, dag1_path]) |
| |
| # when |
| manager._add_callback_to_queue(dag1_req2) |
| # Since dag1_req2 is same as dag1_req1, we now have 2 items in file_path_queue |
| assert manager._file_queue == deque([dag2_path, dag1_path]) |
| assert manager._callback_to_execute[dag1_path] == [ |
| dag1_req1, |
| dag1_req2, |
| ] |
| |
| with mock.patch.object( |
| DagFileProcessorProcess, "start", side_effect=lambda *args, **kwargs: self.mock_processor() |
| ) as start: |
| manager._start_new_processes() |
| # Callbacks passed to processor |
| assert start.call_args_list == [ |
| mock.call( |
| id=mock.ANY, |
| path=Path(dag2_path.bundle_path, dag2_path.rel_path), |
| bundle_path=dag2_path.bundle_path, |
| callbacks=[dag2_req1], |
| selector=mock.ANY, |
| logger=mock_logger, |
| logger_filehandle=mock_filehandle, |
| client=mock.ANY, |
| ), |
| mock.call( |
| id=mock.ANY, |
| path=Path(dag1_path.bundle_path, dag1_path.rel_path), |
| bundle_path=dag1_path.bundle_path, |
| callbacks=[dag1_req1, dag1_req2], |
| selector=mock.ANY, |
| logger=mock_logger, |
| logger_filehandle=mock_filehandle, |
| client=mock.ANY, |
| ), |
| ] |
| # And removed from the queue |
| assert dag1_path not in manager._callback_to_execute |
| assert dag2_path not in manager._callback_to_execute |
| |
| def test_dag_with_assets(self, session, configure_testing_dag_bundle): |
| """'Integration' test to ensure that the assets get parsed and stored correctly for parsed dags.""" |
| test_dag_path = str(TEST_DAG_FOLDER / "test_assets.py") |
| |
| with configure_testing_dag_bundle(test_dag_path): |
| manager = DagFileProcessorManager( |
| max_runs=1, |
| processor_timeout=365 * 86_400, |
| ) |
| manager.run() |
| |
| dag_model = session.get(DagModel, ("dag_with_skip_task")) |
| assert dag_model.task_outlet_asset_references == [ |
| TaskOutletAssetReference(asset_id=mock.ANY, dag_id="dag_with_skip_task", task_id="skip_task") |
| ] |
| |
| def test_bundles_are_refreshed(self): |
| """ |
| Ensure bundles are refreshed by the manager, when necessary. |
| |
| - always refresh all bundles when starting the manager |
| - refresh if the bundle hasn't been refreshed in the refresh_interval |
| - when the latest_version in the db doesn't match the version this manager knows about |
| """ |
| config = [ |
| { |
| "name": "bundleone", |
| "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", |
| "kwargs": {"path": "/dev/null", "refresh_interval": 0}, |
| }, |
| { |
| "name": "bundletwo", |
| "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", |
| "kwargs": {"path": "/dev/null", "refresh_interval": 300}, |
| }, |
| ] |
| |
| bundleone = MagicMock() |
| bundleone.name = "bundleone" |
| bundleone.path = "/dev/null" |
| bundleone.refresh_interval = 0 |
| bundleone.get_current_version.return_value = None |
| bundletwo = MagicMock() |
| bundletwo.name = "bundletwo" |
| bundletwo.path = "/dev/null" |
| bundletwo.refresh_interval = 300 |
| bundletwo.get_current_version.return_value = None |
| |
| with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): |
| DagBundlesManager().sync_bundles_to_db() |
| with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager: |
| mock_bundle_manager.return_value._bundle_config = {"bundleone": None, "bundletwo": None} |
| mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone, bundletwo] |
| |
| # We should refresh bundleone twice, but bundletwo only once - it has a long refresh_interval |
| manager = DagFileProcessorManager(max_runs=2) |
| manager.run() |
| assert bundleone.refresh.call_count == 2 |
| bundletwo.refresh.assert_called_once() |
| |
| # Now, we should refresh both bundles, regardless of the refresh_interval |
| # as we are starting up a fresh manager |
| bundleone.reset_mock() |
| bundletwo.reset_mock() |
| manager = DagFileProcessorManager(max_runs=2) |
| manager.run() |
| assert bundleone.refresh.call_count == 2 |
| bundletwo.refresh.assert_called_once() |
| |
| # however, if the version doesn't match, we should still refresh |
| bundletwo.reset_mock() |
| |
| def _update_bundletwo_version(): |
| # We will update the bundle version in the db, so the next manager loop |
| # will believe another processor had seen a new version |
| with create_session() as session: |
| bundletwo_model = session.get(DagBundleModel, "bundletwo") |
| bundletwo_model.version = "123" |
| |
| bundletwo.refresh.side_effect = _update_bundletwo_version |
| manager = DagFileProcessorManager(max_runs=2) |
| manager.run() |
| assert bundletwo.refresh.call_count == 2 |
| |
| def test_bundle_refresh_check_interval(self): |
| """Ensure dag processor doesn't refresh bundles every loop.""" |
| config = [ |
| { |
| "name": "bundleone", |
| "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", |
| "kwargs": {"path": "/dev/null", "refresh_interval": 0}, |
| }, |
| ] |
| |
| bundleone = MagicMock() |
| bundleone.name = "bundleone" |
| bundleone.path = "/dev/null" |
| bundleone.refresh_interval = 0 |
| bundleone.get_current_version.return_value = None |
| |
| with conf_vars( |
| { |
| ("dag_processor", "dag_bundle_config_list"): json.dumps(config), |
| ("dag_processor", "bundle_refresh_check_interval"): "10", |
| } |
| ): |
| DagBundlesManager().sync_bundles_to_db() |
| manager = DagFileProcessorManager(max_runs=2) |
| manager._dag_bundles = [bundleone] |
| manager._refresh_dag_bundles({}) |
| assert bundleone.refresh.call_count == 1 |
| manager._refresh_dag_bundles({}) |
| assert bundleone.refresh.call_count == 1 # didn't fresh the second time |
| |
| def test_bundle_force_refresh(self): |
| """Ensure the dag processor honors force refreshing a bundle.""" |
| config = [ |
| { |
| "name": "bundleone", |
| "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", |
| "kwargs": {"path": "/dev/null", "refresh_interval": 0}, |
| }, |
| ] |
| |
| bundleone = MagicMock() |
| bundleone.name = "bundleone" |
| bundleone.path = "/dev/null" |
| bundleone.refresh_interval = 0 |
| bundleone.get_current_version.return_value = None |
| |
| with conf_vars( |
| { |
| ("dag_processor", "dag_bundle_config_list"): json.dumps(config), |
| ("dag_processor", "bundle_refresh_check_interval"): "10", |
| } |
| ): |
| DagBundlesManager().sync_bundles_to_db() |
| manager = DagFileProcessorManager(max_runs=2) |
| manager._dag_bundles = [bundleone] |
| manager._refresh_dag_bundles({}) |
| assert bundleone.refresh.call_count == 1 |
| manager._force_refresh_bundles = {"bundleone"} |
| manager._refresh_dag_bundles({}) |
| assert bundleone.refresh.call_count == 2 # forced refresh |
| |
| def test_bundles_versions_are_stored(self, session): |
| config = [ |
| { |
| "name": "bundleone", |
| "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", |
| "kwargs": {"path": "/dev/null", "refresh_interval": 0}, |
| }, |
| ] |
| |
| mybundle = MagicMock() |
| mybundle.name = "bundleone" |
| mybundle.path = "/dev/null" |
| mybundle.refresh_interval = 0 |
| mybundle.supports_versioning = True |
| mybundle.get_current_version.return_value = "123" |
| |
| with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): |
| DagBundlesManager().sync_bundles_to_db() |
| with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager: |
| mock_bundle_manager.return_value._bundle_config = {"bundleone": None} |
| mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [mybundle] |
| manager = DagFileProcessorManager(max_runs=1) |
| manager.run() |
| |
| with create_session() as session: |
| model = session.get(DagBundleModel, "bundleone") |
| assert model.version == "123" |
| |
| def test_non_versioned_bundle_get_version_not_called(self): |
| config = [ |
| { |
| "name": "bundleone", |
| "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", |
| "kwargs": {"path": "/dev/null", "refresh_interval": 0}, |
| }, |
| ] |
| |
| bundleone = MagicMock() |
| bundleone.name = "bundleone" |
| bundleone.refresh_interval = 0 |
| bundleone.supports_versioning = False |
| bundleone.path = Path("/dev/null") |
| |
| with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): |
| DagBundlesManager().sync_bundles_to_db() |
| with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager: |
| mock_bundle_manager.return_value._bundle_config = {"bundleone": None} |
| mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone] |
| manager = DagFileProcessorManager(max_runs=1) |
| manager.run() |
| |
| bundleone.refresh.assert_called_once() |
| bundleone.get_current_version.assert_not_called() |
| |
| def test_versioned_bundle_get_version_called_once(self): |
| """Make sure in a normal "warm" loop, get_current_version is called just once after refresha""" |
| config = [ |
| { |
| "name": "bundleone", |
| "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", |
| "kwargs": {"path": "/dev/null", "refresh_interval": 0}, |
| }, |
| ] |
| |
| bundleone = MagicMock() |
| bundleone.name = "bundleone" |
| bundleone.refresh_interval = 0 |
| bundleone.supports_versioning = True |
| bundleone.get_current_version.return_value = "123" |
| bundleone.path = Path("/dev/null") |
| |
| with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): |
| DagBundlesManager().sync_bundles_to_db() |
| with mock.patch("airflow.dag_processing.manager.DagBundlesManager") as mock_bundle_manager: |
| mock_bundle_manager.return_value._bundle_config = {"bundleone": None} |
| mock_bundle_manager.return_value.get_all_dag_bundles.return_value = [bundleone] |
| manager = DagFileProcessorManager(max_runs=1) |
| manager.run() # run it once to warm up |
| |
| # now run it again so we can check we only call get_current_version once |
| bundleone.refresh.reset_mock() |
| bundleone.get_current_version.reset_mock() |
| manager.run() |
| bundleone.refresh.assert_called_once() |
| bundleone.get_current_version.assert_called_once() |
| |
| @pytest.mark.parametrize( |
| "bundle_names, expected", |
| [ |
| (None, {"bundle1", "bundle2", "bundle3"}), |
| (["bundle1"], {"bundle1"}), |
| (["bundle1", "bundle2"], {"bundle1", "bundle2"}), |
| ], |
| ) |
| def test_bundle_names_to_parse(self, bundle_names, expected, configure_dag_bundles): |
| config = {f"bundle{i}": os.devnull for i in range(1, 4)} |
| with configure_dag_bundles(config): |
| manager = DagFileProcessorManager(max_runs=1, bundle_names_to_parse=bundle_names) |
| manager._run_parsing_loop = MagicMock() |
| manager.run() |
| |
| bundle_names_being_parsed = {b.name for b in manager._dag_bundles} |
| assert bundle_names_being_parsed == expected |
| |
| @conf_vars({("core", "multi_team"): "true"}) |
| def test_bundles_with_team(self, session): |
| team1_name = "test_team1" |
| team2_name = "test_team2" |
| |
| # Create two teams |
| session.add(Team(name=team1_name)) |
| session.add(Team(name=team2_name)) |
| session.commit() |
| |
| # Associate a dag bundle to a team |
| config = [ |
| { |
| "name": "bundle_team", |
| "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", |
| "kwargs": {}, |
| "team_name": team1_name, |
| }, |
| ] |
| |
| with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): |
| DagBundlesManager().sync_bundles_to_db() |
| |
| team = session.scalars(select(Team).where(Team.name == team1_name)).one() |
| assert len(team.dag_bundles) == 1 |
| assert team.dag_bundles[0].name == "bundle_team" |
| |
| # Change the team ownership |
| config = [ |
| { |
| "name": "bundle_team", |
| "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", |
| "kwargs": {}, |
| "team_name": team2_name, |
| }, |
| ] |
| |
| with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): |
| DagBundlesManager().sync_bundles_to_db() |
| |
| team1 = session.scalars(select(Team).where(Team.name == team1_name)).one() |
| assert len(team1.dag_bundles) == 0 |
| team2 = session.scalars(select(Team).where(Team.name == team2_name)).one() |
| assert len(team2.dag_bundles) == 1 |
| assert team2.dag_bundles[0].name == "bundle_team" |
| |
| # Delete the team ownership |
| config = [ |
| { |
| "name": "bundle_team", |
| "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", |
| "kwargs": {}, |
| }, |
| ] |
| |
| with conf_vars({("dag_processor", "dag_bundle_config_list"): json.dumps(config)}): |
| DagBundlesManager().sync_bundles_to_db() |
| |
| team1 = session.scalars(select(Team).where(Team.name == team1_name)).one() |
| assert len(team1.dag_bundles) == 0 |
| team2 = session.scalars(select(Team).where(Team.name == team2_name)).one() |
| assert len(team2.dag_bundles) == 0 |