blob: 5b55cff432657a709767e1ada1b26338ef281016 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import ast
import hashlib
import logging
import os
import zipfile
from io import TextIOWrapper
from pathlib import Path
from typing import Generator, NamedTuple, Pattern, Protocol, overload
import re2
from pathspec.patterns import GitWildMatchPattern
from airflow.configuration import conf
from airflow.exceptions import RemovedInAirflow3Warning
log = logging.getLogger(__name__)
MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}"
class _IgnoreRule(Protocol):
"""Interface for ignore rules for structural subtyping."""
@staticmethod
def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
"""
Build an ignore rule from the supplied pattern.
``base_dir`` and ``definition_file`` should be absolute paths.
"""
@staticmethod
def match(path: Path, rules: list[_IgnoreRule]) -> bool:
"""Match a candidate absolute path against a list of rules."""
class _RegexpIgnoreRule(NamedTuple):
"""Typed namedtuple with utility functions for regexp ignore rules."""
pattern: Pattern
base_dir: Path
@staticmethod
def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
"""Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid."""
try:
return _RegexpIgnoreRule(re2.compile(pattern), base_dir)
except re2.error as e:
log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e)
return None
@staticmethod
def match(path: Path, rules: list[_IgnoreRule]) -> bool:
"""Match a list of ignore rules against the supplied path."""
for rule in rules:
if not isinstance(rule, _RegexpIgnoreRule):
raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}")
if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None:
return True
return False
class _GlobIgnoreRule(NamedTuple):
"""Typed namedtuple with utility functions for glob ignore rules."""
pattern: Pattern
raw_pattern: str
include: bool | None = None
relative_to: Path | None = None
@staticmethod
def compile(pattern: str, _, definition_file: Path) -> _IgnoreRule | None:
"""Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid."""
relative_to: Path | None = None
if pattern.strip() == "/":
# "/" doesn't match anything in gitignore
log.warning("Ignoring no-op glob pattern '/' from %s", definition_file)
return None
if pattern.startswith("/") or "/" in pattern.rstrip("/"):
# See https://git-scm.com/docs/gitignore
# > If there is a separator at the beginning or middle (or both) of the pattern, then the
# > pattern is relative to the directory level of the particular .gitignore file itself.
# > Otherwise the pattern may also match at any level below the .gitignore level.
relative_to = definition_file.parent
ignore_pattern = GitWildMatchPattern(pattern)
return _GlobIgnoreRule(ignore_pattern.regex, pattern, ignore_pattern.include, relative_to)
@staticmethod
def match(path: Path, rules: list[_IgnoreRule]) -> bool:
"""Match a list of ignore rules against the supplied path."""
matched = False
for r in rules:
if not isinstance(r, _GlobIgnoreRule):
raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(r)}")
rule: _GlobIgnoreRule = r # explicit typing to make mypy play nicely
rel_path = str(path.relative_to(rule.relative_to) if rule.relative_to else path.name)
if rule.raw_pattern.endswith("/") and path.is_dir():
# ensure the test path will potentially match a directory pattern if it is a directory
rel_path += "/"
if rule.include is not None and rule.pattern.match(rel_path) is not None:
matched = rule.include
return matched
def TemporaryDirectory(*args, **kwargs):
"""Use `tempfile.TemporaryDirectory`, this function is deprecated."""
import warnings
from tempfile import TemporaryDirectory as TmpDir
warnings.warn(
"This function is deprecated. Please use `tempfile.TemporaryDirectory`",
RemovedInAirflow3Warning,
stacklevel=2,
)
return TmpDir(*args, **kwargs)
def mkdirs(path, mode):
"""
Create the directory specified by path, creating intermediate directories as necessary.
If directory already exists, this is a no-op.
:param path: The directory to create
:param mode: The mode to give to the directory e.g. 0o755, ignores umask
"""
import warnings
warnings.warn(
f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`",
RemovedInAirflow3Warning,
stacklevel=2,
)
Path(path).mkdir(mode=mode, parents=True, exist_ok=True)
ZIP_REGEX = re2.compile(rf"((.*\.zip){re2.escape(os.sep)})?(.*)")
@overload
def correct_maybe_zipped(fileloc: None) -> None: ...
@overload
def correct_maybe_zipped(fileloc: str | Path) -> str | Path: ...
def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path:
"""If the path contains a folder with a .zip suffix, treat it as a zip archive and return path."""
if not fileloc:
return fileloc
search_ = ZIP_REGEX.search(str(fileloc))
if not search_:
return fileloc
_, archive, _ = search_.groups()
if archive and zipfile.is_zipfile(archive):
return archive
else:
return fileloc
def open_maybe_zipped(fileloc, mode="r"):
"""
Open the given file.
If the path contains a folder with a .zip suffix, then the folder
is treated as a zip archive, opening the file inside the archive.
:return: a file object, as in `open`, or as in `ZipFile.open`.
"""
_, archive, filename = ZIP_REGEX.search(fileloc).groups()
if archive and zipfile.is_zipfile(archive):
return TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename))
else:
return open(fileloc, mode=mode)
def _find_path_from_directory(
base_dir_path: str | os.PathLike[str],
ignore_file_name: str,
ignore_rule_type: type[_IgnoreRule],
) -> Generator[str, None, None]:
"""Recursively search the base path and return the list of file paths that should not be ignored.
:param base_dir_path: the base path to be searched
:param ignore_file_name: the file name containing regular expressions for files that should be ignored.
:param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface.
:return: a generator of file paths which should not be ignored.
"""
# A Dict of patterns, keyed using resolved, absolute paths
patterns_by_dir: dict[Path, list[_IgnoreRule]] = {}
for root, dirs, files in os.walk(base_dir_path, followlinks=True):
patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), [])
ignore_file_path = Path(root) / ignore_file_name
if ignore_file_path.is_file():
with open(ignore_file_path) as ifile:
lines_no_comments = [re2.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n")]
# append new patterns and filter out "None" objects, which are invalid patterns
patterns += [
p
for p in [
ignore_rule_type.compile(line, Path(base_dir_path), ignore_file_path)
for line in lines_no_comments
if line
]
if p is not None
]
# evaluation order of patterns is important with negation
# so that later patterns can override earlier patterns
patterns = list(dict.fromkeys(patterns))
dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)]
# explicit loop for infinite recursion detection since we are following symlinks in this walk
for sd in dirs:
dirpath = (Path(root) / sd).resolve()
if dirpath in patterns_by_dir:
raise RuntimeError(
"Detected recursive loop when walking DAG directory "
f"{base_dir_path}: {dirpath} has appeared more than once."
)
patterns_by_dir.update({dirpath: patterns.copy()})
for file in files:
if file != ignore_file_name:
abs_file_path = Path(root) / file
if not ignore_rule_type.match(abs_file_path, patterns):
yield str(abs_file_path)
def find_path_from_directory(
base_dir_path: str | os.PathLike[str],
ignore_file_name: str,
ignore_file_syntax: str = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="regexp"),
) -> Generator[str, None, None]:
"""Recursively search the base path for a list of file paths that should not be ignored.
:param base_dir_path: the base path to be searched
:param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored
:param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob
:return: a generator of file paths.
"""
if ignore_file_syntax == "glob":
return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule)
elif ignore_file_syntax == "regexp" or not ignore_file_syntax:
return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule)
else:
raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")
def list_py_file_paths(
directory: str | os.PathLike[str] | None,
safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True),
include_examples: bool | None = None,
) -> list[str]:
"""Traverse a directory and look for Python files.
:param directory: the directory to traverse
:param safe_mode: whether to use a heuristic to determine whether a file
contains Airflow DAG definitions. If not provided, use the
core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
to safe.
:param include_examples: include example DAGs
:return: a list of paths to Python files in the specified directory
"""
if include_examples is None:
include_examples = conf.getboolean("core", "LOAD_EXAMPLES")
file_paths: list[str] = []
if directory is None:
file_paths = []
elif os.path.isfile(directory):
file_paths = [str(directory)]
elif os.path.isdir(directory):
file_paths.extend(find_dag_file_paths(directory, safe_mode))
if include_examples:
from airflow import example_dags
example_dag_folder = next(iter(example_dags.__path__))
file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, include_examples=False))
return file_paths
def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> list[str]:
"""Find file paths of all DAG files."""
file_paths = []
for file_path in find_path_from_directory(directory, ".airflowignore"):
path = Path(file_path)
try:
if path.is_file() and (path.suffix == ".py" or zipfile.is_zipfile(path)):
if might_contain_dag(file_path, safe_mode):
file_paths.append(file_path)
except Exception:
log.exception("Error while examining %s", file_path)
return file_paths
COMMENT_PATTERN = re2.compile(r"\s*#.*")
def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None) -> bool:
"""
Check whether a Python file contains Airflow DAGs.
When safe_mode is off (with False value), this function always returns True.
If might_contain_dag_callable isn't specified, it uses airflow default heuristic
"""
if not safe_mode:
return True
might_contain_dag_callable = conf.getimport(
"core",
"might_contain_dag_callable",
fallback="airflow.utils.file.might_contain_dag_via_default_heuristic",
)
return might_contain_dag_callable(file_path=file_path, zip_file=zip_file)
def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool:
"""
Heuristic that guesses whether a Python file contains an Airflow DAG definition.
:param file_path: Path to the file to be checked.
:param zip_file: if passed, checks the archive. Otherwise, check local filesystem.
:return: True, if file might contain DAGs.
"""
if zip_file:
with zip_file.open(file_path) as current_file:
content = current_file.read()
else:
if zipfile.is_zipfile(file_path):
return True
with open(file_path, "rb") as dag_file:
content = dag_file.read()
content = content.lower()
return all(s in content for s in (b"dag", b"airflow"))
def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]:
for st in module.body:
if isinstance(st, ast.Import):
for n in st.names:
yield n.name
elif isinstance(st, ast.ImportFrom) and st.module is not None:
yield st.module
def iter_airflow_imports(file_path: str) -> Generator[str, None, None]:
"""Find Airflow modules imported in the given file."""
try:
parsed = ast.parse(Path(file_path).read_bytes())
except Exception:
return
for m in _find_imported_modules(parsed):
if m.startswith("airflow."):
yield m
def get_unique_dag_module_name(file_path: str) -> str:
"""Return a unique module name in the format unusual_prefix_{sha1 of module's file path}_{original module name}."""
if isinstance(file_path, str):
path_hash = hashlib.sha1(file_path.encode("utf-8")).hexdigest()
org_mod_name = Path(file_path).stem
return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, module_name=org_mod_name)
raise ValueError("file_path should be a string to generate unique module name")