blob: c02207951f5d5f85e119503bd6810774bb987369 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
import logging
import os
import re
import zipfile
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Generator, List, Optional, Pattern, Union
from airflow.configuration import conf
if TYPE_CHECKING:
import pathlib
log = logging.getLogger(__name__)
def TemporaryDirectory(*args, **kwargs):
"""This function is deprecated. Please use `tempfile.TemporaryDirectory`"""
import warnings
from tempfile import TemporaryDirectory as TmpDir
warnings.warn(
"This function is deprecated. Please use `tempfile.TemporaryDirectory`",
DeprecationWarning,
stacklevel=2,
)
return TmpDir(*args, **kwargs)
def mkdirs(path, mode):
"""
Creates the directory specified by path, creating intermediate directories
as necessary. If directory already exists, this is a no-op.
:param path: The directory to create
:type path: str
:param mode: The mode to give to the directory e.g. 0o755, ignores umask
:type mode: int
"""
import warnings
warnings.warn(
f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`",
DeprecationWarning,
stacklevel=2,
)
Path(path).mkdir(mode=mode, parents=True, exist_ok=True)
ZIP_REGEX = re.compile(fr'((.*\.zip){re.escape(os.sep)})?(.*)')
def correct_maybe_zipped(fileloc):
"""
If the path contains a folder with a .zip suffix, then
the folder is treated as a zip archive and path to zip is returned.
"""
_, archive, _ = ZIP_REGEX.search(fileloc).groups()
if archive and zipfile.is_zipfile(archive):
return archive
else:
return fileloc
def open_maybe_zipped(fileloc, mode='r'):
"""
Opens the given file. If the path contains a folder with a .zip suffix, then
the folder is treated as a zip archive, opening the file inside the archive.
:return: a file object, as in `open`, or as in `ZipFile.open`.
"""
_, archive, filename = ZIP_REGEX.search(fileloc).groups()
if archive and zipfile.is_zipfile(archive):
return io.TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename))
else:
return open(fileloc, mode=mode)
def find_path_from_directory(base_dir_path: str, ignore_file_name: str) -> Generator[str, None, None]:
"""
Search the file and return the path of the file that should not be ignored.
:param base_dir_path: the base path to be searched for.
:param ignore_file_name: the file name in which specifies a regular expression pattern is written.
:return : file path not to be ignored.
"""
patterns_by_dir: Dict[str, List[Pattern[str]]] = {}
for root, dirs, files in os.walk(str(base_dir_path), followlinks=True):
patterns: List[Pattern[str]] = patterns_by_dir.get(root, [])
ignore_file_path = os.path.join(root, ignore_file_name)
if os.path.isfile(ignore_file_path):
with open(ignore_file_path) as file:
lines_no_comments = [re.sub(r"\s*#.*", "", line) for line in file.read().split("\n")]
patterns += [re.compile(line) for line in lines_no_comments if line]
patterns = list(set(patterns))
dirs[:] = [
subdir
for subdir in dirs
if not any(
p.search(os.path.join(os.path.relpath(root, str(base_dir_path)), subdir)) for p in patterns
)
]
patterns_by_dir.update({os.path.join(root, sd): patterns.copy() for sd in dirs})
for file in files: # type: ignore
if file == ignore_file_name:
continue
abs_file_path = os.path.join(root, str(file))
rel_file_path = os.path.join(os.path.relpath(root, str(base_dir_path)), str(file))
if any(p.search(rel_file_path) for p in patterns):
continue
yield str(abs_file_path)
def list_py_file_paths(
directory: Union[str, "pathlib.Path"],
safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE', fallback=True),
include_examples: Optional[bool] = None,
include_smart_sensor: Optional[bool] = conf.getboolean('smart_sensor', 'use_smart_sensor'),
):
"""
Traverse a directory and look for Python files.
:param directory: the directory to traverse
:type directory: unicode
:param safe_mode: whether to use a heuristic to determine whether a file
contains Airflow DAG definitions. If not provided, use the
core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
to safe.
:type safe_mode: bool
:param include_examples: include example DAGs
:type include_examples: bool
:param include_smart_sensor: include smart sensor native control DAGs
:type include_examples: bool
:return: a list of paths to Python files in the specified directory
:rtype: list[unicode]
"""
if include_examples is None:
include_examples = conf.getboolean('core', 'LOAD_EXAMPLES')
file_paths: List[str] = []
if directory is None:
file_paths = []
elif os.path.isfile(directory):
file_paths = [str(directory)]
elif os.path.isdir(directory):
file_paths.extend(find_dag_file_paths(directory, safe_mode))
if include_examples:
from airflow import example_dags
example_dag_folder = example_dags.__path__[0] # type: ignore
file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, False, False))
if include_smart_sensor:
from airflow import smart_sensor_dags
smart_sensor_dag_folder = smart_sensor_dags.__path__[0] # type: ignore
file_paths.extend(list_py_file_paths(smart_sensor_dag_folder, safe_mode, False, False))
return file_paths
def find_dag_file_paths(directory: Union[str, "pathlib.Path"], safe_mode: bool) -> List[str]:
"""Finds file paths of all DAG files."""
file_paths = []
for file_path in find_path_from_directory(str(directory), ".airflowignore"):
try:
if not os.path.isfile(file_path):
continue
_, file_ext = os.path.splitext(os.path.split(file_path)[-1])
if file_ext != '.py' and not zipfile.is_zipfile(file_path):
continue
if not might_contain_dag(file_path, safe_mode):
continue
file_paths.append(file_path)
except Exception:
log.exception("Error while examining %s", file_path)
return file_paths
COMMENT_PATTERN = re.compile(r"\s*#.*")
def might_contain_dag(file_path: str, safe_mode: bool, zip_file: Optional[zipfile.ZipFile] = None):
"""
Heuristic that guesses whether a Python file contains an Airflow DAG definition.
:param file_path: Path to the file to be checked.
:param safe_mode: Is safe mode active?. If no, this function always returns True.
:param zip_file: if passed, checks the archive. Otherwise, check local filesystem.
:return: True, if file might contain DAGS.
"""
if not safe_mode:
return True
if zip_file:
with zip_file.open(file_path) as current_file:
content = current_file.read()
else:
if zipfile.is_zipfile(file_path):
return True
with open(file_path, 'rb') as dag_file:
content = dag_file.read()
content = content.lower()
return all(s in content for s in (b'dag', b'airflow'))