blob: 496ae8ba02e40993ad550d0b37053c8f4c949e68 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import ast
import io
import logging
import os
import re
import zipfile
from collections import OrderedDict
from pathlib import Path
from typing import TYPE_CHECKING, Generator, NamedTuple, Pattern, overload
from pathspec.patterns import GitWildMatchPattern
from typing_extensions import Protocol
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
if TYPE_CHECKING:
import pathlib
log = logging.getLogger(__name__)
class _IgnoreRule(Protocol):
"""Interface for ignore rules for structural subtyping."""
@staticmethod
def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
"""
Build an ignore rule from the supplied pattern where base_dir
and definition_file should be absolute paths.
"""
@staticmethod
def match(path: Path, rules: list[_IgnoreRule]) -> bool:
"""Match a candidate absolute path against a list of rules."""
class _RegexpIgnoreRule(NamedTuple):
"""Typed namedtuple with utility functions for regexp ignore rules."""
pattern: Pattern
base_dir: Path
@staticmethod
def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
"""Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid."""
try:
return _RegexpIgnoreRule(re.compile(pattern), base_dir)
except re.error as e:
log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e)
return None
@staticmethod
def match(path: Path, rules: list[_IgnoreRule]) -> bool:
"""Match a list of ignore rules against the supplied path."""
for rule in rules:
if not isinstance(rule, _RegexpIgnoreRule):
raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}")
if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None:
return True
return False
class _GlobIgnoreRule(NamedTuple):
"""Typed namedtuple with utility functions for glob ignore rules."""
pattern: Pattern
raw_pattern: str
include: bool | None = None
relative_to: Path | None = None
@staticmethod
def compile(pattern: str, _, definition_file: Path) -> _IgnoreRule | None:
"""Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid."""
relative_to: Path | None = None
if pattern.strip() == "/":
# "/" doesn't match anything in gitignore
log.warning("Ignoring no-op glob pattern '/' from %s", definition_file)
return None
if pattern.startswith("/") or "/" in pattern.rstrip("/"):
# See https://git-scm.com/docs/gitignore
# > If there is a separator at the beginning or middle (or both) of the pattern, then the
# > pattern is relative to the directory level of the particular .gitignore file itself.
# > Otherwise the pattern may also match at any level below the .gitignore level.
relative_to = definition_file.parent
ignore_pattern = GitWildMatchPattern(pattern)
return _GlobIgnoreRule(ignore_pattern.regex, pattern, ignore_pattern.include, relative_to)
@staticmethod
def match(path: Path, rules: list[_IgnoreRule]) -> bool:
"""Match a list of ignore rules against the supplied path."""
matched = False
for r in rules:
if not isinstance(r, _GlobIgnoreRule):
raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(r)}")
rule: _GlobIgnoreRule = r # explicit typing to make mypy play nicely
rel_path = str(path.relative_to(rule.relative_to) if rule.relative_to else path.name)
if rule.raw_pattern.endswith("/") and path.is_dir():
# ensure the test path will potentially match a directory pattern if it is a directory
rel_path += "/"
if rule.include is not None and rule.pattern.match(rel_path) is not None:
matched = rule.include
return matched
def TemporaryDirectory(*args, **kwargs):
"""This function is deprecated. Please use `tempfile.TemporaryDirectory`."""
import warnings
from tempfile import TemporaryDirectory as TmpDir
warnings.warn(
"This function is deprecated. Please use `tempfile.TemporaryDirectory`",
RemovedInAirflow3Warning,
stacklevel=2,
)
return TmpDir(*args, **kwargs)
def mkdirs(path, mode):
"""
Creates the directory specified by path, creating intermediate directories
as necessary. If directory already exists, this is a no-op.
:param path: The directory to create
:param mode: The mode to give to the directory e.g. 0o755, ignores umask
"""
import warnings
warnings.warn(
f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`",
RemovedInAirflow3Warning,
stacklevel=2,
)
Path(path).mkdir(mode=mode, parents=True, exist_ok=True)
ZIP_REGEX = re.compile(rf"((.*\.zip){re.escape(os.sep)})?(.*)")
@overload
def correct_maybe_zipped(fileloc: None) -> None:
...
@overload
def correct_maybe_zipped(fileloc: str | Path) -> str | Path:
...
def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path:
"""
If the path contains a folder with a .zip suffix, then
the folder is treated as a zip archive and path to zip is returned.
"""
if not fileloc:
return fileloc
search_ = ZIP_REGEX.search(str(fileloc))
if not search_:
return fileloc
_, archive, _ = search_.groups()
if archive and zipfile.is_zipfile(archive):
return archive
else:
return fileloc
def open_maybe_zipped(fileloc, mode="r"):
"""
Opens the given file. If the path contains a folder with a .zip suffix, then
the folder is treated as a zip archive, opening the file inside the archive.
:return: a file object, as in `open`, or as in `ZipFile.open`.
"""
_, archive, filename = ZIP_REGEX.search(fileloc).groups()
if archive and zipfile.is_zipfile(archive):
return io.TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename))
else:
return open(fileloc, mode=mode)
def _find_path_from_directory(
base_dir_path: str,
ignore_file_name: str,
ignore_rule_type: type[_IgnoreRule],
) -> Generator[str, None, None]:
"""
Recursively search the base path and return the list of file paths that should not be ignored by
regular expressions in any ignore files at each directory level.
:param base_dir_path: the base path to be searched
:param ignore_file_name: the file name containing regular expressions for files that should be ignored.
:param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface.
:return: a generator of file paths which should not be ignored.
"""
# A Dict of patterns, keyed using resolved, absolute paths
patterns_by_dir: dict[Path, list[_IgnoreRule]] = {}
for root, dirs, files in os.walk(base_dir_path, followlinks=True):
patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), [])
ignore_file_path = Path(root) / ignore_file_name
if ignore_file_path.is_file():
with open(ignore_file_path) as ifile:
lines_no_comments = [re.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n")]
# append new patterns and filter out "None" objects, which are invalid patterns
patterns += [
p
for p in [
ignore_rule_type.compile(line, Path(base_dir_path), ignore_file_path)
for line in lines_no_comments
if line
]
if p is not None
]
# evaluation order of patterns is important with negation
# so that later patterns can override earlier patterns
patterns = list(OrderedDict.fromkeys(patterns).keys())
dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)]
# explicit loop for infinite recursion detection since we are following symlinks in this walk
for sd in dirs:
dirpath = (Path(root) / sd).resolve()
if dirpath in patterns_by_dir:
raise RuntimeError(
"Detected recursive loop when walking DAG directory "
f"{base_dir_path}: {dirpath} has appeared more than once."
)
patterns_by_dir.update({dirpath: patterns.copy()})
for file in files:
if file == ignore_file_name:
continue
abs_file_path = Path(root) / file
if ignore_rule_type.match(abs_file_path, patterns):
continue
yield str(abs_file_path)
def find_path_from_directory(
base_dir_path: str,
ignore_file_name: str,
ignore_file_syntax: str = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="regexp"),
) -> Generator[str, None, None]:
"""
Recursively search the base path and return the list of file paths that should not be ignored.
:param base_dir_path: the base path to be searched
:param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored
:param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob
:return: a generator of file paths.
"""
if ignore_file_syntax == "glob":
return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule)
elif ignore_file_syntax == "regexp" or not ignore_file_syntax:
return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule)
else:
raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")
def list_py_file_paths(
directory: str | pathlib.Path,
safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True),
include_examples: bool | None = None,
) -> list[str]:
"""
Traverse a directory and look for Python files.
:param directory: the directory to traverse
:param safe_mode: whether to use a heuristic to determine whether a file
contains Airflow DAG definitions. If not provided, use the
core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
to safe.
:param include_examples: include example DAGs
:return: a list of paths to Python files in the specified directory
"""
if include_examples is None:
include_examples = conf.getboolean("core", "LOAD_EXAMPLES")
file_paths: list[str] = []
if directory is None:
file_paths = []
elif os.path.isfile(directory):
file_paths = [str(directory)]
elif os.path.isdir(directory):
file_paths.extend(find_dag_file_paths(directory, safe_mode))
if include_examples:
from airflow import example_dags
example_dag_folder = example_dags.__path__[0] # type: ignore
file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, include_examples=False))
return file_paths
def find_dag_file_paths(directory: str | pathlib.Path, safe_mode: bool) -> list[str]:
"""Finds file paths of all DAG files."""
file_paths = []
for file_path in find_path_from_directory(str(directory), ".airflowignore"):
try:
if not os.path.isfile(file_path):
continue
_, file_ext = os.path.splitext(os.path.split(file_path)[-1])
if file_ext != ".py" and not zipfile.is_zipfile(file_path):
continue
if not might_contain_dag(file_path, safe_mode):
continue
file_paths.append(file_path)
except Exception:
log.exception("Error while examining %s", file_path)
return file_paths
COMMENT_PATTERN = re.compile(r"\s*#.*")
def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None) -> bool:
"""
Check whether a Python file contains Airflow DAGs.
When safe_mode is off (with False value), this function always returns True.
If might_contain_dag_callable isn't specified, it uses airflow default heuristic
"""
if not safe_mode:
return True
might_contain_dag_callable = conf.getimport(
"core",
"might_contain_dag_callable",
fallback="airflow.utils.file.might_contain_dag_via_default_heuristic",
)
return might_contain_dag_callable(file_path=file_path, zip_file=zip_file)
def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool:
"""
Heuristic that guesses whether a Python file contains an Airflow DAG definition.
:param file_path: Path to the file to be checked.
:param zip_file: if passed, checks the archive. Otherwise, check local filesystem.
:return: True, if file might contain DAGs.
"""
if zip_file:
with zip_file.open(file_path) as current_file:
content = current_file.read()
else:
if zipfile.is_zipfile(file_path):
return True
with open(file_path, "rb") as dag_file:
content = dag_file.read()
content = content.lower()
return all(s in content for s in (b"dag", b"airflow"))
def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]:
for st in module.body:
if isinstance(st, ast.Import):
for n in st.names:
yield n.name
elif isinstance(st, ast.ImportFrom) and st.module is not None:
yield st.module
def iter_airflow_imports(file_path: str) -> Generator[str, None, None]:
"""Find Airflow modules imported in the given file."""
try:
parsed = ast.parse(Path(file_path).read_bytes())
except Exception:
return
for m in _find_imported_modules(parsed):
if m.startswith("airflow."):
yield m