| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import annotations |
| |
| import os |
| import zipfile |
| from pathlib import Path |
| from unittest import mock |
| |
| import pytest |
| |
| from airflow.utils import file as file_utils |
| from airflow.utils.file import ( |
| correct_maybe_zipped, |
| find_path_from_directory, |
| list_py_file_paths, |
| open_maybe_zipped, |
| ) |
| |
| from tests.models import TEST_DAGS_FOLDER |
| from tests_common.test_utils.config import conf_vars |
| |
| TEST_DAG_FOLDER = os.environ["AIRFLOW__CORE__DAGS_FOLDER"] |
| |
| |
| def might_contain_dag(file_path: str, zip_file: zipfile.ZipFile | None = None): |
| return False |
| |
| |
| class TestCorrectMaybeZipped: |
| @mock.patch("zipfile.is_zipfile") |
| def test_correct_maybe_zipped_normal_file(self, mocked_is_zipfile): |
| path = "/path/to/some/file.txt" |
| mocked_is_zipfile.return_value = False |
| |
| dag_folder = correct_maybe_zipped(path) |
| |
| assert dag_folder == path |
| |
| @mock.patch("zipfile.is_zipfile") |
| def test_correct_maybe_zipped_normal_file_with_zip_in_name(self, mocked_is_zipfile): |
| path = "/path/to/fakearchive.zip.other/file.txt" |
| mocked_is_zipfile.return_value = False |
| |
| dag_folder = correct_maybe_zipped(path) |
| |
| assert dag_folder == path |
| |
| @mock.patch("zipfile.is_zipfile") |
| def test_correct_maybe_zipped_archive(self, mocked_is_zipfile): |
| path = "/path/to/archive.zip/deep/path/to/file.txt" |
| mocked_is_zipfile.return_value = True |
| |
| dag_folder = correct_maybe_zipped(path) |
| |
| assert mocked_is_zipfile.call_count == 1 |
| (args, kwargs) = mocked_is_zipfile.call_args_list[0] |
| assert args[0] == "/path/to/archive.zip" |
| |
| assert dag_folder == "/path/to/archive.zip" |
| |
| |
| class TestOpenMaybeZipped: |
| def test_open_maybe_zipped_normal_file(self): |
| test_file_path = os.path.join(TEST_DAGS_FOLDER, "no_dags.py") |
| with open_maybe_zipped(test_file_path, "r") as test_file: |
| content = test_file.read() |
| assert isinstance(content, str) |
| |
| def test_open_maybe_zipped_normal_file_with_zip_in_name(self): |
| path = "/path/to/fakearchive.zip.other/file.txt" |
| with mock.patch("builtins.open", mock.mock_open(read_data="data")) as mock_file: |
| open_maybe_zipped(path) |
| mock_file.assert_called_once_with(path, mode="r") |
| |
| def test_open_maybe_zipped_archive(self, test_zip_path): |
| test_file_path = os.path.join(test_zip_path, "test_zip.py") |
| with open_maybe_zipped(test_file_path, "r") as test_file: |
| content = test_file.read() |
| assert isinstance(content, str) |
| |
| |
| class TestListPyFilesPath: |
| @pytest.fixture |
| def test_dir(self, tmp_path): |
| # create test tree with symlinks |
| source = os.path.join(tmp_path, "folder") |
| target = os.path.join(tmp_path, "symlink") |
| py_file = os.path.join(source, "hello_world.py") |
| ignore_file = os.path.join(tmp_path, ".airflowignore") |
| os.mkdir(source) |
| os.symlink(source, target) |
| # write ignore files |
| with open(ignore_file, "w") as f: |
| f.write("folder") |
| # write sample pyfile |
| with open(py_file, "w") as f: |
| f.write("print('hello world')") |
| return tmp_path |
| |
| def test_find_path_from_directory_regex_ignore(self): |
| should_ignore = [ |
| "test_invalid_cron.py", |
| "test_invalid_param.py", |
| "test_ignore_this.py", |
| ] |
| files = find_path_from_directory(TEST_DAGS_FOLDER, ".airflowignore") |
| |
| assert files |
| assert all(os.path.basename(file) not in should_ignore for file in files) |
| |
| def test_find_path_from_directory_glob_ignore(self): |
| should_ignore = [ |
| "test_invalid_cron.py", |
| "test_invalid_param.py", |
| "test_ignore_this.py", |
| "test_prev_dagrun_dep.py", |
| "test_nested_dag.py", |
| ".airflowignore", |
| ] |
| should_not_ignore = [ |
| "test_on_kill.py", |
| "test_dont_ignore_this.py", |
| ] |
| files = list(find_path_from_directory(TEST_DAGS_FOLDER, ".airflowignore_glob", "glob")) |
| |
| assert files |
| assert all(os.path.basename(file) not in should_ignore for file in files) |
| assert sum(1 for file in files if os.path.basename(file) in should_not_ignore) == len( |
| should_not_ignore |
| ) |
| |
| def test_find_path_from_directory_respects_symlinks_regexp_ignore(self, test_dir): |
| ignore_list_file = ".airflowignore" |
| found = list(find_path_from_directory(test_dir, ignore_list_file)) |
| |
| assert os.path.join(test_dir, "symlink", "hello_world.py") in found |
| assert os.path.join(test_dir, "folder", "hello_world.py") not in found |
| |
| def test_find_path_from_directory_respects_symlinks_glob_ignore(self, test_dir): |
| ignore_list_file = ".airflowignore" |
| found = list(find_path_from_directory(test_dir, ignore_list_file, ignore_file_syntax="glob")) |
| |
| assert os.path.join(test_dir, "symlink", "hello_world.py") in found |
| assert os.path.join(test_dir, "folder", "hello_world.py") not in found |
| |
| def test_find_path_from_directory_fails_on_recursive_link(self, test_dir): |
| # add a recursive link |
| recursing_src = os.path.join(test_dir, "folder2", "recursor") |
| recursing_tgt = os.path.join(test_dir, "folder2") |
| os.mkdir(recursing_tgt) |
| os.symlink(recursing_tgt, recursing_src) |
| |
| ignore_list_file = ".airflowignore" |
| |
| error_message = ( |
| f"Detected recursive loop when walking DAG directory {test_dir}: " |
| f"{Path(recursing_tgt).resolve()} has appeared more than once." |
| ) |
| with pytest.raises(RuntimeError, match=error_message): |
| list(find_path_from_directory(test_dir, ignore_list_file, ignore_file_syntax="glob")) |
| |
| def test_might_contain_dag_with_default_callable(self): |
| file_path_with_dag = os.path.join(TEST_DAGS_FOLDER, "test_scheduler_dags.py") |
| |
| assert file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=True) |
| |
| @conf_vars({("core", "might_contain_dag_callable"): "tests.utils.test_file.might_contain_dag"}) |
| def test_might_contain_dag(self): |
| """Test might_contain_dag_callable""" |
| file_path_with_dag = os.path.join(TEST_DAGS_FOLDER, "test_scheduler_dags.py") |
| |
| # There is a DAG defined in the file_path_with_dag, however, the might_contain_dag_callable |
| # returns False no matter what, which is used to test might_contain_dag_callable actually |
| # overrides the default function |
| assert not file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=True) |
| |
| # With safe_mode is False, the user defined callable won't be invoked |
| assert file_utils.might_contain_dag(file_path=file_path_with_dag, safe_mode=False) |
| |
| def test_get_modules(self): |
| file_path = os.path.join(TEST_DAGS_FOLDER, "test_imports.py") |
| |
| modules = list(file_utils.iter_airflow_imports(file_path)) |
| |
| assert len(modules) == 4 |
| assert "airflow.utils" in modules |
| assert "airflow.decorators" in modules |
| assert "airflow.models" in modules |
| assert "airflow.sensors" in modules |
| # this one is a local import, we don't want it. |
| assert "airflow.local_import" not in modules |
| # this one is in a comment, we don't want it |
| assert "airflow.in_comment" not in modules |
| # we don't want imports under conditions |
| assert "airflow.if_branch" not in modules |
| assert "airflow.else_branch" not in modules |
| |
| def test_get_modules_from_invalid_file(self): |
| file_path = os.path.join(TEST_DAGS_FOLDER, "README.md") # just getting a non-python file |
| |
| # should not error |
| modules = list(file_utils.iter_airflow_imports(file_path)) |
| |
| assert len(modules) == 0 |
| |
| def test_list_py_file_paths(self, test_zip_path): |
| detected_files = set() |
| expected_files = set() |
| # No_dags is empty, _invalid_ is ignored by .airflowignore |
| ignored_files = { |
| "no_dags.py", |
| "test_invalid_cron.py", |
| "test_invalid_dup_task.py", |
| "test_ignore_this.py", |
| "test_invalid_param.py", |
| "test_invalid_param2.py", |
| "test_invalid_param3.py", |
| "test_invalid_param4.py", |
| "test_nested_dag.py", |
| "test_imports.py", |
| "file_no_airflow_dag.py", # no_dag test case in test_zip folder |
| "test.py", # no_dag test case in test_zip_module folder |
| "__init__.py", |
| } |
| for root, _, files in os.walk(TEST_DAG_FOLDER): |
| for file_name in files: |
| if file_name.endswith((".py", ".zip")): |
| if file_name not in ignored_files: |
| expected_files.add(f"{root}/{file_name}") |
| detected_files = set(list_py_file_paths(TEST_DAG_FOLDER)) |
| assert detected_files == expected_files |
| |
| |
| @pytest.mark.parametrize( |
| "edge_filename, expected_modification", |
| [ |
| ("test_dag.py", "unusual_prefix_mocked_path_hash_sha1_test_dag"), |
| ("test-dag.py", "unusual_prefix_mocked_path_hash_sha1_test_dag"), |
| ("test-dag-1.py", "unusual_prefix_mocked_path_hash_sha1_test_dag_1"), |
| ("test-dag_1.py", "unusual_prefix_mocked_path_hash_sha1_test_dag_1"), |
| ("test-dag.dev.py", "unusual_prefix_mocked_path_hash_sha1_test_dag_dev"), |
| ("test_dag.prod.py", "unusual_prefix_mocked_path_hash_sha1_test_dag_prod"), |
| ], |
| ) |
| def test_get_unique_dag_module_name(edge_filename, expected_modification): |
| with mock.patch("hashlib.sha1") as mocked_sha1: |
| mocked_sha1.return_value.hexdigest.return_value = "mocked_path_hash_sha1" |
| modify_module_name = file_utils.get_unique_dag_module_name(edge_filename) |
| assert modify_module_name == expected_modification |